This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0edef8c907 GH-36975: [C++][FlightRPC] Skip unknown fields, don't crash 
(#36979)
0edef8c907 is described below

commit 0edef8c90797ba5dcadd0daf5009dc2d91ecd099
Author: David Li <[email protected]>
AuthorDate: Tue Aug 1 19:39:39 2023 -0400

    GH-36975: [C++][FlightRPC] Skip unknown fields, don't crash (#36979)
    
    
    
    ### Rationale for this change
    
    We should skip unknown fields instead of crashing, for forwards 
compatibility.
    
    ### What changes are included in this PR?
    
    Skip unknown fields in the FlightData deserializer.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #36975
    
    Authored-by: David Li <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/flight/flight_internals_test.cc      | 35 ++++++++++++++++++++++
 .../transport/grpc/serialization_internal.cc       | 34 +++++++++++----------
 2 files changed, 54 insertions(+), 15 deletions(-)

diff --git a/cpp/src/arrow/flight/flight_internals_test.cc 
b/cpp/src/arrow/flight/flight_internals_test.cc
index 27c13ff949..e56bab6db2 100644
--- a/cpp/src/arrow/flight/flight_internals_test.cc
+++ b/cpp/src/arrow/flight/flight_internals_test.cc
@@ -34,6 +34,9 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/util/string.h"
 
+// Include after Flight headers
+#include <grpc/slice.h>
+
 namespace arrow {
 namespace flight {
 
@@ -651,6 +654,38 @@ TEST_F(TestCookieParsing, CookieCache) {
   AddCookieVerifyCache({"id0=0;", "id1=1;", "id2=2"}, "id0=0; id1=1; id2=2");
 }
 
+// ----------------------------------------------------------------------
+// Protobuf tests
+
+TEST(GrpcTransport, FlightDataDeserialize) {
+#ifndef _WIN32
+  pb::FlightData raw;
+  // Tack on known and unknown fields by hand here
+  raw.GetReflection()->MutableUnknownFields(&raw)->AddFixed32(900, 1024);
+  raw.GetReflection()->MutableUnknownFields(&raw)->AddFixed64(901, 1024);
+  raw.GetReflection()->MutableUnknownFields(&raw)->AddVarint(902, 1024);
+  raw.GetReflection()->MutableUnknownFields(&raw)->AddLengthDelimited(903, 
"foobar");
+  // Known field comes at end
+  raw.GetReflection()->MutableUnknownFields(&raw)->AddLengthDelimited(
+      pb::FlightData::kDataBodyFieldNumber, "data");
+
+  auto serialized = raw.SerializeAsString();
+
+  grpc_slice slice = grpc_slice_from_copied_buffer(serialized.data(), 
serialized.size());
+  // gRPC requires that grpc_slice and grpc::Slice have the same representation
+  grpc::ByteBuffer buffer(reinterpret_cast<const grpc::Slice*>(&slice), 
/*nslices=*/1);
+
+  flight::internal::FlightData out;
+  auto status = flight::transport::grpc::FlightDataDeserialize(&buffer, &out);
+  ASSERT_TRUE(status.ok());
+  ASSERT_EQ("data", out.body->ToString());
+
+  grpc_slice_unref(slice);
+#else
+  GTEST_SKIP() << "Can't use Protobuf symbols on Windows";
+#endif
+}
+
 // ----------------------------------------------------------------------
 // Transport abstraction tests
 
diff --git a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc 
b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
index cff111d64d..372dca7a2c 100644
--- a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
+++ b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
@@ -21,6 +21,7 @@
 
 #include <cstdint>
 #include <limits>
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -297,7 +298,7 @@ static const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 
0, 0};
         const auto remainder = static_cast<int>(
             bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
         if (remainder) {
-          slices.push_back(::grpc::Slice(kPaddingBytes, remainder));
+          slices.emplace_back(kPaddingBytes, remainder);
         }
       }
     }
@@ -316,7 +317,7 @@ static const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 
0, 0};
 ::grpc::Status FlightDataDeserialize(ByteBuffer* buffer,
                                      arrow::flight::internal::FlightData* out) 
{
   if (!buffer) {
-    return ::grpc::Status(::grpc::StatusCode::INTERNAL, "No payload");
+    return {::grpc::StatusCode::INTERNAL, "No payload"};
   }
 
   // Reset fields in case the caller reuses a single allocation
@@ -342,42 +343,45 @@ static const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 
0, 0, 0};
         pb::FlightDescriptor pb_descriptor;
         uint32_t length;
         if (!pb_stream.ReadVarint32(&length)) {
-          return ::grpc::Status(::grpc::StatusCode::INTERNAL,
-                                "Unable to parse length of FlightDescriptor");
+          return {::grpc::StatusCode::INTERNAL,
+                  "Unable to parse length of FlightDescriptor"};
         }
         // Can't use ParseFromCodedStream as this reads the entire
         // rest of the stream into the descriptor command field.
         std::string buffer;
         pb_stream.ReadString(&buffer, length);
         if (!pb_descriptor.ParseFromString(buffer)) {
-          return ::grpc::Status(::grpc::StatusCode::INTERNAL,
-                                "Unable to parse FlightDescriptor");
+          return {::grpc::StatusCode::INTERNAL, "Unable to parse 
FlightDescriptor"};
         }
         arrow::flight::FlightDescriptor descriptor;
         GRPC_RETURN_NOT_OK(
             arrow::flight::internal::FromProto(pb_descriptor, &descriptor));
-        out->descriptor.reset(new arrow::flight::FlightDescriptor(descriptor));
+        out->descriptor = 
std::make_unique<arrow::flight::FlightDescriptor>(descriptor);
       } break;
       case pb::FlightData::kDataHeaderFieldNumber: {
         if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) {
-          return ::grpc::Status(::grpc::StatusCode::INTERNAL,
-                                "Unable to read FlightData metadata");
+          return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData 
metadata"};
         }
       } break;
       case pb::FlightData::kAppMetadataFieldNumber: {
         if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, 
&out->app_metadata)) {
-          return ::grpc::Status(::grpc::StatusCode::INTERNAL,
-                                "Unable to read FlightData application 
metadata");
+          return {::grpc::StatusCode::INTERNAL,
+                  "Unable to read FlightData application metadata"};
         }
       } break;
       case pb::FlightData::kDataBodyFieldNumber: {
         if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) {
-          return ::grpc::Status(::grpc::StatusCode::INTERNAL,
-                                "Unable to read FlightData body");
+          return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData 
body"};
         }
       } break;
-      default:
-        DCHECK(false) << "cannot happen";
+      default: {
+        // Unknown field. We should skip it for compatibility.
+        if (!WireFormatLite::SkipField(&pb_stream, tag)) {
+          return {::grpc::StatusCode::INTERNAL,
+                  "Could not skip unknown field tag in FlightData"};
+        }
+        break;
+      }
     }
   }
   buffer->Clear();

Reply via email to