This is an automated email from the ASF dual-hosted git repository.

raulcd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b3120c1dc7 GH-49548: [C++][FlightRPC] Decouple Flight 
Serialize/Deserialize from gRPC transport (#49549)
b3120c1dc7 is described below

commit b3120c1dc7c53af99b022039d117e3d8bf174103
Author: Raúl Cumplido <[email protected]>
AuthorDate: Tue Mar 31 16:38:11 2026 +0200

    GH-49548: [C++][FlightRPC] Decouple Flight Serialize/Deserialize from gRPC 
transport (#49549)
    
    ### Rationale for this change
    
    Currently the Serialize/Deserialize APIs are gRPC dependent. This means 
that any code that needs to encode or decode Flight data must depend on gRPC 
C++ internals. After some discussions around trying to build a PoC using gRPC's 
generic API with gRPC's BidiReactor we discussed that these primitives should 
be made gRPC agnostic.
    
    ### What changes are included in this PR?
    
    - Move the serialization/deserialization logic from 
`cpp/src/arrow/flight/transport/grpc/serialization_internal.{h,cc}` to 
`cpp/src/arrow/flight/serialization_internal.cc`
    - Create new `arrow::Result<arrow::BufferVector> 
SerializePayloadToBuffers(const FlightPayload& msg)` gRPC agnostic function.
    - Create new `arrow::Result<arrow::flight::internal::FlightData> 
DeserializeFlightData(const std::shared_ptr<arrow::Buffer>& buffer)` gRPC 
agnostic function.
    - Keep the existing serialize/deserialize functions for gRPC as simple 
wrappers on top of the new serialization functions to implement the 
`grpc::ByteBuffer` and `grpc::Slice` details.
    - Add utility `arrow::Result<BufferVector> SerializeToBuffers() const;` to 
`FlightPayload` struct.
    - Add roundtrip tests to serialize/deserialize
    
    ### Are these changes tested?
    
    Yes, both by existing tests and new tests.
    
    ### Are there any user-facing changes?
    
    No
    
    * GitHub Issue: #49548
    
    Authored-by: Raúl Cumplido <[email protected]>
    Signed-off-by: Raúl Cumplido <[email protected]>
---
 cpp/src/arrow/flight/flight_internals_test.cc      |  73 ++++++
 cpp/src/arrow/flight/serialization_internal.cc     | 232 ++++++++++++++++++++
 cpp/src/arrow/flight/serialization_internal.h      |   9 +
 cpp/src/arrow/flight/transport.h                   |   2 +-
 .../transport/grpc/serialization_internal.cc       | 244 ++-------------------
 cpp/src/arrow/flight/types.cc                      |   4 +
 cpp/src/arrow/flight/types.h                       |   3 +
 7 files changed, 340 insertions(+), 227 deletions(-)

diff --git a/cpp/src/arrow/flight/flight_internals_test.cc 
b/cpp/src/arrow/flight/flight_internals_test.cc
index bb14ddd665..a66ac210b8 100644
--- a/cpp/src/arrow/flight/flight_internals_test.cc
+++ b/cpp/src/arrow/flight/flight_internals_test.cc
@@ -23,13 +23,18 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include "arrow/buffer.h"
 #include "arrow/flight/client_cookie_middleware.h"
 #include "arrow/flight/client_middleware.h"
 #include "arrow/flight/cookie_internal.h"
 #include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
 #include "arrow/flight/test_util.h"
+#include "arrow/flight/transport.h"
 #include "arrow/flight/transport/grpc/util_internal.h"
 #include "arrow/flight/types.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/util/string.h"
@@ -730,6 +735,74 @@ TEST(GrpcTransport, FlightDataDeserialize) {
 #endif
 }
 
+// ----------------------------------------------------------------------
+// Transport-agnostic serialization roundtrip tests
+
+TEST(FlightSerialization, RoundtripPayloadWithBody) {
+  // Use RecordBatchStream to generate FlightPayloads
+  auto schema = arrow::schema({arrow::field("a", arrow::int32())});
+  auto arr = ArrayFromJSON(arrow::int32(), "[1, 2, 3]");
+  auto batch = RecordBatch::Make(schema, 3, {arr});
+  auto reader = RecordBatchReader::Make({batch}).ValueOrDie();
+  RecordBatchStream stream(std::move(reader));
+
+  // Get a FlightPayload from the stream
+  ASSERT_OK_AND_ASSIGN(auto schema_payload, stream.GetSchemaPayload());
+  ASSERT_OK_AND_ASSIGN(auto flight_payload, stream.Next());
+
+  // Add app_metadata to the flight payload
+  flight_payload.app_metadata = Buffer::FromString("test-metadata");
+
+  // Serialize FlightPayload to BufferVector
+  ASSERT_OK_AND_ASSIGN(auto buffers, 
internal::SerializePayloadToBuffers(flight_payload));
+  ASSERT_GT(buffers.size(), 0);
+
+  // Concatenate to a single buffer for deserialization and deserialize.
+  ASSERT_OK_AND_ASSIGN(auto concat, ConcatenateBuffers(buffers));
+  ASSERT_OK_AND_ASSIGN(auto data, internal::DeserializeFlightData(concat));
+
+  // Verify IPC metadata (data_header) is present
+  ASSERT_NE(data.metadata, nullptr);
+  ASSERT_GT(data.metadata->size(), 0);
+
+  // Verify app_metadata
+  ASSERT_NE(data.app_metadata, nullptr);
+  ASSERT_EQ(data.app_metadata->ToString(), "test-metadata");
+
+  // Verify body and message are present
+  ASSERT_NE(data.body, nullptr);
+  ASSERT_GT(data.body->size(), 0);
+  ASSERT_OK_AND_ASSIGN(auto message, data.OpenMessage());
+  ASSERT_NE(message, nullptr);
+  // Also verify the RecordBatch roundtrips correctly
+  ipc::DictionaryMemo dict_memo;
+  ASSERT_OK_AND_ASSIGN(auto result_batch,
+                       ipc::ReadRecordBatch(*message, schema, &dict_memo,
+                                            ipc::IpcReadOptions::Defaults()));
+  ASSERT_TRUE(result_batch->Equals(*batch));
+}
+
+TEST(FlightSerialization, RoundtripMetadataOnly) {
+  // A metadata-only payload (no IPC body, no descriptor)
+  auto app_meta = Buffer::FromString("metadata-only-message");
+
+  FlightPayload payload;
+  payload.app_metadata = std::move(app_meta);
+
+  // Serialize
+  ASSERT_OK_AND_ASSIGN(auto buffers, 
internal::SerializePayloadToBuffers(payload));
+  ASSERT_OK_AND_ASSIGN(auto concat, ConcatenateBuffers(buffers));
+
+  // Deserialize
+  ASSERT_OK_AND_ASSIGN(auto data, internal::DeserializeFlightData(concat));
+
+  // Verify: no descriptor, no IPC metadata, just app_metadata
+  ASSERT_EQ(data.descriptor, nullptr);
+  ASSERT_EQ(data.metadata, nullptr);
+  ASSERT_NE(data.app_metadata, nullptr);
+  ASSERT_EQ(data.app_metadata->ToString(), "metadata-only-message");
+}
+
 // ----------------------------------------------------------------------
 // Transport abstraction tests
 
diff --git a/cpp/src/arrow/flight/serialization_internal.cc 
b/cpp/src/arrow/flight/serialization_internal.cc
index 604375311d..a090075814 100644
--- a/cpp/src/arrow/flight/serialization_internal.cc
+++ b/cpp/src/arrow/flight/serialization_internal.cc
@@ -17,18 +17,24 @@
 
 #include "arrow/flight/serialization_internal.h"
 
+#include <limits>
 #include <memory>
 #include <string>
 
 #include <google/protobuf/any.pb.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/wire_format_lite.h>
 
 #include "arrow/buffer.h"
 #include "arrow/flight/protocol_internal.h"
 #include "arrow/io/memory.h"
+#include "arrow/ipc/message.h"
 #include "arrow/ipc/reader.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/logging_internal.h"
 
 // Lambda helper & CTAD
 template <class... Ts>
@@ -612,6 +618,232 @@ Status ToProto(const CloseSessionResult& result, 
pb::CloseSessionResult* pb_resu
   return Status::OK();
 }
 
+namespace {
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::io::CodedOutputStream;
+static constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
+const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
+
+// Update the sizes of our Protobuf fields based on the given IPC payload.
+arrow::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, bool 
has_body,
+                                   size_t* header_size, int32_t* 
metadata_size) {
+  DCHECK_LE(ipc_msg.metadata->size(), kInt32Max);
+  *metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
+
+  // 1 byte for metadata tag
+  *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size);
+
+  // 2 bytes for body tag
+  if (has_body) {
+    // We write the body tag in the header but not the actual body data
+    *header_size += 2 + 
WireFormatLite::LengthDelimitedSize(ipc_msg.body_length) -
+                    ipc_msg.body_length;
+  }
+
+  return arrow::Status::OK();
+}
+
+bool ReadBytesZeroCopy(const std::shared_ptr<Buffer>& source_data,
+                       CodedInputStream* input, std::shared_ptr<Buffer>* out) {
+  uint32_t length;
+  if (!input->ReadVarint32(&length)) {
+    return false;
+  }
+  auto buf =
+      SliceBuffer(source_data, input->CurrentPosition(), 
static_cast<int64_t>(length));
+  *out = buf;
+  return input->Skip(static_cast<int>(length));
+}
+
+}  // namespace
+
+arrow::Result<arrow::BufferVector> SerializePayloadToBuffers(const 
FlightPayload& msg) {
+  // Size of the IPC body (protobuf: data_body)
+  size_t body_size = 0;
+  // Size of the Protobuf "header" (everything except for the body)
+  size_t header_size = 0;
+  // Size of IPC header metadata (protobuf: data_header)
+  int32_t metadata_size = 0;
+
+  // Write the descriptor if present
+  int32_t descriptor_size = 0;
+  if (msg.descriptor != nullptr) {
+    DCHECK_LE(msg.descriptor->size(), kInt32Max);
+    descriptor_size = static_cast<int32_t>(msg.descriptor->size());
+    header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size);
+  }
+
+  // App metadata tag if appropriate
+  int32_t app_metadata_size = 0;
+  if (msg.app_metadata && msg.app_metadata->size() > 0) {
+    DCHECK_LE(msg.app_metadata->size(), kInt32Max);
+    app_metadata_size = static_cast<int32_t>(msg.app_metadata->size());
+    header_size += 1 + WireFormatLite::LengthDelimitedSize(app_metadata_size);
+  }
+
+  const arrow::ipc::IpcPayload& ipc_msg = msg.ipc_message;
+  // No data in this payload (metadata-only).
+  bool has_ipc = ipc_msg.type != ipc::MessageType::NONE;
+  bool has_body = has_ipc ? ipc::Message::HasBody(ipc_msg.type) : false;
+
+  if (has_ipc) {
+    DCHECK(has_body || ipc_msg.body_length == 0);
+    ARROW_RETURN_NOT_OK(
+        IpcMessageHeaderSize(ipc_msg, has_body, &header_size, &metadata_size));
+    body_size = static_cast<size_t>(ipc_msg.body_length);
+  }
+
+  // TODO(wesm): messages over 2GB unlikely to be yet supported
+  // Validated in WritePayload since returning error here causes gRPC to fail 
an assertion
+  DCHECK_LE(body_size, kInt32Max);
+
+  // Allocate and initialize buffers
+  arrow::BufferVector buffers;
+  ARROW_ASSIGN_OR_RAISE(auto header_buf, arrow::AllocateBuffer(header_size));
+
+  // Force the header_stream to be destructed, which actually flushes
+  // the data into the slice.
+  {
+    ArrayOutputStream 
header_writer(const_cast<uint8_t*>(header_buf->mutable_data()),
+                                    static_cast<int>(header_size));
+    CodedOutputStream header_stream(&header_writer);
+
+    // Write descriptor
+    if (msg.descriptor != nullptr) {
+      WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
+      header_stream.WriteVarint32(descriptor_size);
+      header_stream.WriteRawMaybeAliased(msg.descriptor->data(),
+                                         
static_cast<int>(msg.descriptor->size()));
+    }
+
+    // Write header
+    if (has_ipc) {
+      WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
+      header_stream.WriteVarint32(metadata_size);
+      header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
+                                         
static_cast<int>(ipc_msg.metadata->size()));
+    }
+
+    // Write app metadata
+    if (app_metadata_size > 0) {
+      WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
+      header_stream.WriteVarint32(app_metadata_size);
+      header_stream.WriteRawMaybeAliased(msg.app_metadata->data(),
+                                         
static_cast<int>(msg.app_metadata->size()));
+    }
+
+    if (has_body) {
+      // Write body tag
+      WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
+      header_stream.WriteVarint32(static_cast<uint32_t>(body_size));
+
+      // Enqueue body buffers for writing, without copying
+      for (const auto& buffer : ipc_msg.body_buffers) {
+        // Buffer may be null when the row length is zero, or when all
+        // entries are invalid.
+        if (!buffer || buffer->size() == 0) continue;
+        buffers.push_back(buffer);
+
+        // Write padding if not multiple of 8
+        const auto remainder = static_cast<int>(
+            bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
+        if (remainder) {
+          buffers.push_back(std::make_shared<arrow::Buffer>(kPaddingBytes, 
remainder));
+        }
+      }
+    }
+
+    DCHECK_EQ(static_cast<int>(header_size), header_stream.ByteCount());
+  }
+  // Once header is written we add it as the first buffer in the output vector.
+  buffers.insert(buffers.begin(), std::move(header_buf));
+
+  return buffers;
+}
+
+// Read internal::FlightData from arrow::Buffer containing FlightData
+// protobuf without copying
+arrow::Result<arrow::flight::internal::FlightData> DeserializeFlightData(
+    const std::shared_ptr<arrow::Buffer>& buffer) {
+  if (!buffer) {
+    return Status::Invalid("No payload");
+  }
+
+  arrow::flight::internal::FlightData out;
+
+  auto buffer_length = static_cast<int>(buffer->size());
+  CodedInputStream pb_stream(buffer->data(), buffer_length);
+
+  pb_stream.SetTotalBytesLimit(buffer_length);
+
+  // This is the bytes remaining when using CodedInputStream like this
+  while (pb_stream.BytesUntilTotalBytesLimit()) {
+    const uint32_t tag = pb_stream.ReadTag();
+    const int field_number = WireFormatLite::GetTagFieldNumber(tag);
+    switch (field_number) {
+      case pb::FlightData::kFlightDescriptorFieldNumber: {
+        pb::FlightDescriptor pb_descriptor;
+        uint32_t length;
+        if (!pb_stream.ReadVarint32(&length)) {
+          return Status::Invalid("Unable to parse length of FlightDescriptor");
+        }
+        // Can't use ParseFromCodedStream as this reads the entire
+        // rest of the stream into the descriptor command field.
+        std::string buffer;
+        if (!pb_stream.ReadString(&buffer, length)) {
+          return Status::Invalid("Unable to read FlightDescriptor from 
protobuf");
+        }
+        if (!pb_descriptor.ParseFromString(buffer)) {
+          return Status::Invalid("Unable to parse FlightDescriptor");
+        }
+        arrow::flight::FlightDescriptor descriptor;
+        ARROW_RETURN_NOT_OK(
+            arrow::flight::internal::FromProto(pb_descriptor, &descriptor));
+        out.descriptor = 
std::make_unique<arrow::flight::FlightDescriptor>(descriptor);
+      } break;
+      case pb::FlightData::kDataHeaderFieldNumber: {
+        if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.metadata)) {
+          return Status::Invalid("Unable to read FlightData metadata");
+        }
+      } break;
+      case pb::FlightData::kAppMetadataFieldNumber: {
+        if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.app_metadata)) {
+          return Status::Invalid("Unable to read FlightData application 
metadata");
+        }
+      } break;
+      case pb::FlightData::kDataBodyFieldNumber: {
+        if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.body)) {
+          return Status::Invalid("Unable to read FlightData body");
+        }
+      } break;
+      default: {
+        // Unknown field. We should skip it for compatibility.
+        if (!WireFormatLite::SkipField(&pb_stream, tag)) {
+          return Status::Invalid("Could not skip unknown field tag in 
FlightData");
+        }
+        break;
+      }
+    }
+  }
+
+  // TODO(wesm): Where and when should we verify that the FlightData is not
+  // malformed?
+
+  // Set the default value for an unspecified FlightData body. The other
+  // fields can be null if they're unspecified.
+  if (out.body == nullptr) {
+    out.body = std::make_shared<Buffer>(nullptr, 0);
+  }
+
+  return out;
+}
+
 }  // namespace internal
 }  // namespace flight
 }  // namespace arrow
diff --git a/cpp/src/arrow/flight/serialization_internal.h 
b/cpp/src/arrow/flight/serialization_internal.h
index 4d07efad81..896ca5b675 100644
--- a/cpp/src/arrow/flight/serialization_internal.h
+++ b/cpp/src/arrow/flight/serialization_internal.h
@@ -182,6 +182,15 @@ ARROW_FLIGHT_EXPORT Status ToProto(const 
CloseSessionResult& result,
 
 Status ToPayload(const FlightDescriptor& descr, std::shared_ptr<Buffer>* out);
 
+/// \brief Serialize a FlightPayload to a vector of buffers.
+ARROW_FLIGHT_EXPORT
+arrow::Result<arrow::BufferVector> SerializePayloadToBuffers(const 
FlightPayload& msg);
+
+/// \brief Deserialize FlightData from a contiguous buffer.
+ARROW_FLIGHT_EXPORT
+arrow::Result<internal::FlightData> DeserializeFlightData(
+    const std::shared_ptr<arrow::Buffer>& buffer);
+
 // We want to reuse RecordBatchStreamReader's implementation while
 // (1) Adapting it to the Flight message format
 // (2) Allowing pure-metadata messages before data is sent
diff --git a/cpp/src/arrow/flight/transport.h b/cpp/src/arrow/flight/transport.h
index 4ce5053402..6b96943189 100644
--- a/cpp/src/arrow/flight/transport.h
+++ b/cpp/src/arrow/flight/transport.h
@@ -76,7 +76,7 @@ class FlightStatusDetail;
 namespace internal {
 
 /// Internal, not user-visible type used for memory-efficient reads
-struct FlightData {
+struct ARROW_FLIGHT_EXPORT FlightData {
   /// Used only for puts, may be null
   std::unique_ptr<FlightDescriptor> descriptor;
 
diff --git a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc 
b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
index d4848c5077..ee13a8e202 100644
--- a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
+++ b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
@@ -17,10 +17,7 @@
 
 #include "arrow/flight/transport/grpc/serialization_internal.h"
 
-// todo cleanup includes
-
 #include <cstdint>
-#include <limits>
 #include <memory>
 #include <string>
 #include <vector>
@@ -32,10 +29,6 @@
 #  pragma warning(disable : 4267)
 #endif
 
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/wire_format_lite.h>
-
 #include <grpc/byte_buffer_reader.h>
 #include <grpcpp/grpcpp.h>
 #include <grpcpp/impl/codegen/proto_utils.h>
@@ -49,9 +42,7 @@
 #include "arrow/flight/serialization_internal.h"
 #include "arrow/flight/transport.h"
 #include "arrow/flight/transport/grpc/util_internal.h"
-#include "arrow/ipc/message.h"
 #include "arrow/ipc/writer.h"
-#include "arrow/util/bit_util.h"
 #include "arrow/util/logging_internal.h"
 
 namespace arrow {
@@ -61,28 +52,10 @@ namespace grpc {
 
 namespace pb = arrow::flight::protocol;
 
-static constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
-using google::protobuf::internal::WireFormatLite;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-
 using ::grpc::ByteBuffer;
 
 namespace {
 
-bool ReadBytesZeroCopy(const std::shared_ptr<Buffer>& source_data,
-                       CodedInputStream* input, std::shared_ptr<Buffer>* out) {
-  uint32_t length;
-  if (!input->ReadVarint32(&length)) {
-    return false;
-  }
-  auto buf =
-      SliceBuffer(source_data, input->CurrentPosition(), 
static_cast<int64_t>(length));
-  *out = buf;
-  return input->Skip(static_cast<int>(length));
-}
-
 // Internal wrapper for gRPC ByteBuffer so its memory can be exposed to Arrow
 // consumers with zero-copy
 class GrpcBuffer : public MutableBuffer {
@@ -176,142 +149,29 @@ arrow::Result<::grpc::Slice> SliceFromBuffer(const 
std::shared_ptr<Buffer>& buf)
   return slice;
 }
 
-const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
-
-// Update the sizes of our Protobuf fields based on the given IPC payload.
-::grpc::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, 
bool has_body,
-                                    size_t* header_size, int32_t* 
metadata_size) {
-  DCHECK_LE(ipc_msg.metadata->size(), kInt32Max);
-  *metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
-
-  // 1 byte for metadata tag
-  *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size);
-
-  // 2 bytes for body tag
-  if (has_body) {
-    // We write the body tag in the header but not the actual body data
-    *header_size += 2 + 
WireFormatLite::LengthDelimitedSize(ipc_msg.body_length) -
-                    ipc_msg.body_length;
-  }
-
-  return ::grpc::Status::OK;
-}
-
 }  // namespace
 
 ::grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
                                    bool* own_buffer) {
-  // Size of the IPC body (protobuf: data_body)
-  size_t body_size = 0;
-  // Size of the Protobuf "header" (everything except for the body)
-  size_t header_size = 0;
-  // Size of IPC header metadata (protobuf: data_header)
-  int32_t metadata_size = 0;
-
-  // Write the descriptor if present
-  int32_t descriptor_size = 0;
-  if (msg.descriptor != nullptr) {
-    DCHECK_LE(msg.descriptor->size(), kInt32Max);
-    descriptor_size = static_cast<int32_t>(msg.descriptor->size());
-    header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size);
+  // Retrieve BufferVector from the transport-agnostic serialization.
+  auto buffers_result = 
arrow::flight::internal::SerializePayloadToBuffers(msg);
+  if (!buffers_result.ok()) {
+    return ToGrpcStatus(buffers_result.status());
   }
 
-  // App metadata tag if appropriate
-  int32_t app_metadata_size = 0;
-  if (msg.app_metadata && msg.app_metadata->size() > 0) {
-    DCHECK_LE(msg.app_metadata->size(), kInt32Max);
-    app_metadata_size = static_cast<int32_t>(msg.app_metadata->size());
-    header_size += 1 + WireFormatLite::LengthDelimitedSize(app_metadata_size);
-  }
-
-  const arrow::ipc::IpcPayload& ipc_msg = msg.ipc_message;
-  // No data in this payload (metadata-only).
-  bool has_ipc = ipc_msg.type != ipc::MessageType::NONE;
-  bool has_body = has_ipc ? ipc::Message::HasBody(ipc_msg.type) : false;
-
-  if (has_ipc) {
-    DCHECK(has_body || ipc_msg.body_length == 0);
-    GRPC_RETURN_NOT_GRPC_OK(
-        IpcMessageHeaderSize(ipc_msg, has_body, &header_size, &metadata_size));
-    body_size = static_cast<size_t>(ipc_msg.body_length);
-  }
-
-  // TODO(wesm): messages over 2GB unlikely to be yet supported
-  // Validated in WritePayload since returning error here causes gRPC to fail 
an assertion
-  DCHECK_LE(body_size, kInt32Max);
-
-  // Allocate and initialize slices
   std::vector<::grpc::Slice> slices;
-  slices.emplace_back(header_size);
-
-  // Force the header_stream to be destructed, which actually flushes
-  // the data into the slice.
-  {
-    ArrayOutputStream header_writer(const_cast<uint8_t*>(slices[0].begin()),
-                                    static_cast<int>(slices[0].size()));
-    CodedOutputStream header_stream(&header_writer);
-
-    // Write descriptor
-    if (msg.descriptor != nullptr) {
-      WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber,
-                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
-      header_stream.WriteVarint32(descriptor_size);
-      header_stream.WriteRawMaybeAliased(msg.descriptor->data(),
-                                         
static_cast<int>(msg.descriptor->size()));
-    }
-
-    // Write header
-    if (has_ipc) {
-      WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
-                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
-      header_stream.WriteVarint32(metadata_size);
-      header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
-                                         
static_cast<int>(ipc_msg.metadata->size()));
-    }
-
-    // Write app metadata
-    if (app_metadata_size > 0) {
-      WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber,
-                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
-      header_stream.WriteVarint32(app_metadata_size);
-      header_stream.WriteRawMaybeAliased(msg.app_metadata->data(),
-                                         
static_cast<int>(msg.app_metadata->size()));
-    }
-
-    if (has_body) {
-      // Write body tag
-      WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
-                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, 
&header_stream);
-      header_stream.WriteVarint32(static_cast<uint32_t>(body_size));
-
-      // Enqueue body buffers for writing, without copying
-      for (const auto& buffer : ipc_msg.body_buffers) {
-        // Buffer may be null when the row length is zero, or when all
-        // entries are invalid.
-        if (!buffer || buffer->size() == 0) continue;
-
-        ::grpc::Slice slice;
-        auto status = SliceFromBuffer(buffer).Value(&slice);
-        if (ARROW_PREDICT_FALSE(!status.ok())) {
-          // This will likely lead to abort as gRPC cannot recover from an 
error here
-          return ToGrpcStatus(status);
-        }
-        slices.push_back(std::move(slice));
-
-        // Write padding if not multiple of 8
-        const auto remainder = static_cast<int>(
-            bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
-        if (remainder) {
-          slices.emplace_back(kPaddingBytes, remainder);
-        }
-      }
+  slices.reserve(buffers_result->size());
+  for (const auto& buffer : *buffers_result) {
+    ::grpc::Slice slice;
+    auto status = SliceFromBuffer(buffer).Value(&slice);
+    if (ARROW_PREDICT_FALSE(!status.ok())) {
+      // This will likely lead to abort as gRPC cannot recover from an error 
here
+      return ToGrpcStatus(status);
     }
-
-    DCHECK_EQ(static_cast<int>(header_size), header_stream.ByteCount());
+    slices.push_back(std::move(slice));
   }
 
-  // Hand off the slices to the returned ByteBuffer
-  *out = ::grpc::ByteBuffer(slices.data(), slices.size());
+  *out = ByteBuffer(slices.data(), slices.size());
   *own_buffer = true;
   return ::grpc::Status::OK;
 }
@@ -324,84 +184,16 @@ const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
     return {::grpc::StatusCode::INTERNAL, "No payload"};
   }
 
-  // Reset fields in case the caller reuses a single allocation
-  out->descriptor = nullptr;
-  out->app_metadata = nullptr;
-  out->metadata = nullptr;
-  out->body = nullptr;
-
   std::shared_ptr<arrow::Buffer> wrapped_buffer;
   GRPC_RETURN_NOT_OK(GrpcBuffer::Wrap(buffer, &wrapped_buffer));
-
-  auto buffer_length = static_cast<int>(wrapped_buffer->size());
-  CodedInputStream pb_stream(wrapped_buffer->data(), buffer_length);
-
-  pb_stream.SetTotalBytesLimit(buffer_length);
-
-  // This is the bytes remaining when using CodedInputStream like this
-  while (pb_stream.BytesUntilTotalBytesLimit()) {
-    const uint32_t tag = pb_stream.ReadTag();
-    const int field_number = WireFormatLite::GetTagFieldNumber(tag);
-    switch (field_number) {
-      case pb::FlightData::kFlightDescriptorFieldNumber: {
-        pb::FlightDescriptor pb_descriptor;
-        uint32_t length;
-        if (!pb_stream.ReadVarint32(&length)) {
-          return {::grpc::StatusCode::INTERNAL,
-                  "Unable to parse length of FlightDescriptor"};
-        }
-        // Can't use ParseFromCodedStream as this reads the entire
-        // rest of the stream into the descriptor command field.
-        std::string buffer;
-        if (!pb_stream.ReadString(&buffer, length)) {
-          return {::grpc::StatusCode::INTERNAL,
-                  "Unable to read FlightDescriptor from protobuf"};
-        }
-        if (!pb_descriptor.ParseFromString(buffer)) {
-          return {::grpc::StatusCode::INTERNAL, "Unable to parse 
FlightDescriptor"};
-        }
-        arrow::flight::FlightDescriptor descriptor;
-        GRPC_RETURN_NOT_OK(
-            arrow::flight::internal::FromProto(pb_descriptor, &descriptor));
-        out->descriptor = 
std::make_unique<arrow::flight::FlightDescriptor>(descriptor);
-      } break;
-      case pb::FlightData::kDataHeaderFieldNumber: {
-        if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) {
-          return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData 
metadata"};
-        }
-      } break;
-      case pb::FlightData::kAppMetadataFieldNumber: {
-        if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, 
&out->app_metadata)) {
-          return {::grpc::StatusCode::INTERNAL,
-                  "Unable to read FlightData application metadata"};
-        }
-      } break;
-      case pb::FlightData::kDataBodyFieldNumber: {
-        if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) {
-          return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData 
body"};
-        }
-      } break;
-      default: {
-        // Unknown field. We should skip it for compatibility.
-        if (!WireFormatLite::SkipField(&pb_stream, tag)) {
-          return {::grpc::StatusCode::INTERNAL,
-                  "Could not skip unknown field tag in FlightData"};
-        }
-        break;
-      }
-    }
-  }
+  // Release gRPC memory now that Arrow Buffer holds its own reference.
   buffer->Clear();
 
-  // TODO(wesm): Where and when should we verify that the FlightData is not
-  // malformed?
-
-  // Set the default value for an unspecified FlightData body. The other
-  // fields can be null if they're unspecified.
-  if (out->body == nullptr) {
-    out->body = std::make_shared<Buffer>(nullptr, 0);
+  auto result = arrow::flight::internal::DeserializeFlightData(wrapped_buffer);
+  if (!result.ok()) {
+    return ToGrpcStatus(result.status());
   }
-
+  *out = result.MoveValueUnsafe();
   return ::grpc::Status::OK;
 }
 
diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc
index 8166513d4e..495425c4ae 100644
--- a/cpp/src/arrow/flight/types.cc
+++ b/cpp/src/arrow/flight/types.cc
@@ -886,6 +886,10 @@ Status FlightPayload::Validate() const {
   return Status::OK();
 }
 
+arrow::Result<BufferVector> FlightPayload::SerializeToBuffers() const {
+  return internal::SerializePayloadToBuffers(*this);
+}
+
 std::string ActionType::ToString() const {
   return arrow::internal::JoinToString("<ActionType type='", type, "' 
description='",
                                        description, "'>");
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
index d498ac67f7..fdd1881a63 100644
--- a/cpp/src/arrow/flight/types.h
+++ b/cpp/src/arrow/flight/types.h
@@ -904,6 +904,9 @@ struct ARROW_FLIGHT_EXPORT FlightPayload {
 
   /// \brief Check that the payload can be written to the wire.
   Status Validate() const;
+
+  /// \brief Serialize this payload to a vector of buffers.
+  arrow::Result<BufferVector> SerializeToBuffers() const;
 };
 
 // A wrapper around arrow.flight.protocol.PutResult is not defined

Reply via email to