This is an automated email from the ASF dual-hosted git repository.
raulcd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b3120c1dc7 GH-49548: [C++][FlightRPC] Decouple Flight
Serialize/Deserialize from gRPC transport (#49549)
b3120c1dc7 is described below
commit b3120c1dc7c53af99b022039d117e3d8bf174103
Author: Raúl Cumplido <[email protected]>
AuthorDate: Tue Mar 31 16:38:11 2026 +0200
GH-49548: [C++][FlightRPC] Decouple Flight Serialize/Deserialize from gRPC
transport (#49549)
### Rationale for this change
Currently the Serialize/Deserialize APIs are gRPC dependent. This means
that any code that needs to encode or decode Flight data must depend on gRPC
C++ internals. After some discussions around trying to build a PoC using gRPC's
generic API with gRPC's BidiReactor we discussed that these primitives should
be made gRPC agnostic.
### What changes are included in this PR?
- Move the serialization/deserialization logic from
`cpp/src/arrow/flight/transport/grpc/serialization_internal.{h,cc}` to
`cpp/src/arrow/flight/serialization_internal.cc`
- Create new `arrow::Result<arrow::BufferVector>
SerializePayloadToBuffers(const FlightPayload& msg)` gRPC agnostic function.
- Create new `arrow::Result<arrow::flight::internal::FlightData>
DeserializeFlightData(const std::shared_ptr<arrow::Buffer>& buffer)` gRPC
agnostic function.
- Keep the existing serialize/deserialize functions for gRPC as simple
wrappers on top of the new serialization functions to implement the
`grpc::ByteBuffer` and `grpc::Slice` details.
- Add utility `arrow::Result<BufferVector> SerializeToBuffers() const;` to
`FlightPayload` struct.
- Add roundtrip tests to serialize/deserialize
### Are these changes tested?
Yes, both by existing tests and new tests.
### Are there any user-facing changes?
No
* GitHub Issue: #49548
Authored-by: Raúl Cumplido <[email protected]>
Signed-off-by: Raúl Cumplido <[email protected]>
---
cpp/src/arrow/flight/flight_internals_test.cc | 73 ++++++
cpp/src/arrow/flight/serialization_internal.cc | 232 ++++++++++++++++++++
cpp/src/arrow/flight/serialization_internal.h | 9 +
cpp/src/arrow/flight/transport.h | 2 +-
.../transport/grpc/serialization_internal.cc | 244 ++-------------------
cpp/src/arrow/flight/types.cc | 4 +
cpp/src/arrow/flight/types.h | 3 +
7 files changed, 340 insertions(+), 227 deletions(-)
diff --git a/cpp/src/arrow/flight/flight_internals_test.cc
b/cpp/src/arrow/flight/flight_internals_test.cc
index bb14ddd665..a66ac210b8 100644
--- a/cpp/src/arrow/flight/flight_internals_test.cc
+++ b/cpp/src/arrow/flight/flight_internals_test.cc
@@ -23,13 +23,18 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include "arrow/buffer.h"
#include "arrow/flight/client_cookie_middleware.h"
#include "arrow/flight/client_middleware.h"
#include "arrow/flight/cookie_internal.h"
#include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
#include "arrow/flight/test_util.h"
+#include "arrow/flight/transport.h"
#include "arrow/flight/transport/grpc/util_internal.h"
#include "arrow/flight/types.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/string.h"
@@ -730,6 +735,74 @@ TEST(GrpcTransport, FlightDataDeserialize) {
#endif
}
+// ----------------------------------------------------------------------
+// Transport-agnostic serialization roundtrip tests
+
+TEST(FlightSerialization, RoundtripPayloadWithBody) {
+ // Use RecordBatchStream to generate FlightPayloads
+ auto schema = arrow::schema({arrow::field("a", arrow::int32())});
+ auto arr = ArrayFromJSON(arrow::int32(), "[1, 2, 3]");
+ auto batch = RecordBatch::Make(schema, 3, {arr});
+ auto reader = RecordBatchReader::Make({batch}).ValueOrDie();
+ RecordBatchStream stream(std::move(reader));
+
+ // Get a FlightPayload from the stream
+ ASSERT_OK_AND_ASSIGN(auto schema_payload, stream.GetSchemaPayload());
+ ASSERT_OK_AND_ASSIGN(auto flight_payload, stream.Next());
+
+ // Add app_metadata to the flight payload
+ flight_payload.app_metadata = Buffer::FromString("test-metadata");
+
+ // Serialize FlightPayload to BufferVector
+ ASSERT_OK_AND_ASSIGN(auto buffers,
internal::SerializePayloadToBuffers(flight_payload));
+ ASSERT_GT(buffers.size(), 0);
+
+ // Concatenate to a single buffer for deserialization and deserialize.
+ ASSERT_OK_AND_ASSIGN(auto concat, ConcatenateBuffers(buffers));
+ ASSERT_OK_AND_ASSIGN(auto data, internal::DeserializeFlightData(concat));
+
+ // Verify IPC metadata (data_header) is present
+ ASSERT_NE(data.metadata, nullptr);
+ ASSERT_GT(data.metadata->size(), 0);
+
+ // Verify app_metadata
+ ASSERT_NE(data.app_metadata, nullptr);
+ ASSERT_EQ(data.app_metadata->ToString(), "test-metadata");
+
+ // Verify body and message are present
+ ASSERT_NE(data.body, nullptr);
+ ASSERT_GT(data.body->size(), 0);
+ ASSERT_OK_AND_ASSIGN(auto message, data.OpenMessage());
+ ASSERT_NE(message, nullptr);
+ // Also verify the RecordBatch roundtrips correctly
+ ipc::DictionaryMemo dict_memo;
+ ASSERT_OK_AND_ASSIGN(auto result_batch,
+ ipc::ReadRecordBatch(*message, schema, &dict_memo,
+ ipc::IpcReadOptions::Defaults()));
+ ASSERT_TRUE(result_batch->Equals(*batch));
+}
+
+TEST(FlightSerialization, RoundtripMetadataOnly) {
+ // A metadata-only payload (no IPC body, no descriptor)
+ auto app_meta = Buffer::FromString("metadata-only-message");
+
+ FlightPayload payload;
+ payload.app_metadata = std::move(app_meta);
+
+ // Serialize
+ ASSERT_OK_AND_ASSIGN(auto buffers,
internal::SerializePayloadToBuffers(payload));
+ ASSERT_OK_AND_ASSIGN(auto concat, ConcatenateBuffers(buffers));
+
+ // Deserialize
+ ASSERT_OK_AND_ASSIGN(auto data, internal::DeserializeFlightData(concat));
+
+ // Verify: no descriptor, no IPC metadata, just app_metadata
+ ASSERT_EQ(data.descriptor, nullptr);
+ ASSERT_EQ(data.metadata, nullptr);
+ ASSERT_NE(data.app_metadata, nullptr);
+ ASSERT_EQ(data.app_metadata->ToString(), "metadata-only-message");
+}
+
// ----------------------------------------------------------------------
// Transport abstraction tests
diff --git a/cpp/src/arrow/flight/serialization_internal.cc
b/cpp/src/arrow/flight/serialization_internal.cc
index 604375311d..a090075814 100644
--- a/cpp/src/arrow/flight/serialization_internal.cc
+++ b/cpp/src/arrow/flight/serialization_internal.cc
@@ -17,18 +17,24 @@
#include "arrow/flight/serialization_internal.h"
+#include <limits>
#include <memory>
#include <string>
#include <google/protobuf/any.pb.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/wire_format_lite.h>
#include "arrow/buffer.h"
#include "arrow/flight/protocol_internal.h"
#include "arrow/io/memory.h"
+#include "arrow/ipc/message.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/util/logging_internal.h"
// Lambda helper & CTAD
template <class... Ts>
@@ -612,6 +618,232 @@ Status ToProto(const CloseSessionResult& result,
pb::CloseSessionResult* pb_resu
return Status::OK();
}
+namespace {
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::io::CodedOutputStream;
+static constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
+const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
+
+// Update the sizes of our Protobuf fields based on the given IPC payload.
+arrow::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg, bool
has_body,
+ size_t* header_size, int32_t*
metadata_size) {
+ DCHECK_LE(ipc_msg.metadata->size(), kInt32Max);
+ *metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
+
+ // 1 byte for metadata tag
+ *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size);
+
+ // 2 bytes for body tag
+ if (has_body) {
+ // We write the body tag in the header but not the actual body data
+ *header_size += 2 +
WireFormatLite::LengthDelimitedSize(ipc_msg.body_length) -
+ ipc_msg.body_length;
+ }
+
+ return arrow::Status::OK();
+}
+
+bool ReadBytesZeroCopy(const std::shared_ptr<Buffer>& source_data,
+ CodedInputStream* input, std::shared_ptr<Buffer>* out) {
+ uint32_t length;
+ if (!input->ReadVarint32(&length)) {
+ return false;
+ }
+ auto buf =
+ SliceBuffer(source_data, input->CurrentPosition(),
static_cast<int64_t>(length));
+ *out = buf;
+ return input->Skip(static_cast<int>(length));
+}
+
+} // namespace
+
+arrow::Result<arrow::BufferVector> SerializePayloadToBuffers(const
FlightPayload& msg) {
+ // Size of the IPC body (protobuf: data_body)
+ size_t body_size = 0;
+ // Size of the Protobuf "header" (everything except for the body)
+ size_t header_size = 0;
+ // Size of IPC header metadata (protobuf: data_header)
+ int32_t metadata_size = 0;
+
+ // Write the descriptor if present
+ int32_t descriptor_size = 0;
+ if (msg.descriptor != nullptr) {
+ DCHECK_LE(msg.descriptor->size(), kInt32Max);
+ descriptor_size = static_cast<int32_t>(msg.descriptor->size());
+ header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size);
+ }
+
+ // App metadata tag if appropriate
+ int32_t app_metadata_size = 0;
+ if (msg.app_metadata && msg.app_metadata->size() > 0) {
+ DCHECK_LE(msg.app_metadata->size(), kInt32Max);
+ app_metadata_size = static_cast<int32_t>(msg.app_metadata->size());
+ header_size += 1 + WireFormatLite::LengthDelimitedSize(app_metadata_size);
+ }
+
+ const arrow::ipc::IpcPayload& ipc_msg = msg.ipc_message;
+ // No data in this payload (metadata-only).
+ bool has_ipc = ipc_msg.type != ipc::MessageType::NONE;
+ bool has_body = has_ipc ? ipc::Message::HasBody(ipc_msg.type) : false;
+
+ if (has_ipc) {
+ DCHECK(has_body || ipc_msg.body_length == 0);
+ ARROW_RETURN_NOT_OK(
+ IpcMessageHeaderSize(ipc_msg, has_body, &header_size, &metadata_size));
+ body_size = static_cast<size_t>(ipc_msg.body_length);
+ }
+
+ // TODO(wesm): messages over 2GB unlikely to be yet supported
+ // Validated in WritePayload since returning error here causes gRPC to fail
an assertion
+ DCHECK_LE(body_size, kInt32Max);
+
+ // Allocate and initialize buffers
+ arrow::BufferVector buffers;
+ ARROW_ASSIGN_OR_RAISE(auto header_buf, arrow::AllocateBuffer(header_size));
+
+ // Force the header_stream to be destructed, which actually flushes
+ // the data into the slice.
+ {
+ ArrayOutputStream
header_writer(const_cast<uint8_t*>(header_buf->mutable_data()),
+ static_cast<int>(header_size));
+ CodedOutputStream header_stream(&header_writer);
+
+ // Write descriptor
+ if (msg.descriptor != nullptr) {
+ WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber,
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
+ header_stream.WriteVarint32(descriptor_size);
+ header_stream.WriteRawMaybeAliased(msg.descriptor->data(),
+
static_cast<int>(msg.descriptor->size()));
+ }
+
+ // Write header
+ if (has_ipc) {
+ WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
+ header_stream.WriteVarint32(metadata_size);
+ header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
+
static_cast<int>(ipc_msg.metadata->size()));
+ }
+
+ // Write app metadata
+ if (app_metadata_size > 0) {
+ WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber,
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
+ header_stream.WriteVarint32(app_metadata_size);
+ header_stream.WriteRawMaybeAliased(msg.app_metadata->data(),
+
static_cast<int>(msg.app_metadata->size()));
+ }
+
+ if (has_body) {
+ // Write body tag
+ WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
+ header_stream.WriteVarint32(static_cast<uint32_t>(body_size));
+
+ // Enqueue body buffers for writing, without copying
+ for (const auto& buffer : ipc_msg.body_buffers) {
+ // Buffer may be null when the row length is zero, or when all
+ // entries are invalid.
+ if (!buffer || buffer->size() == 0) continue;
+ buffers.push_back(buffer);
+
+ // Write padding if not multiple of 8
+ const auto remainder = static_cast<int>(
+ bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
+ if (remainder) {
+ buffers.push_back(std::make_shared<arrow::Buffer>(kPaddingBytes,
remainder));
+ }
+ }
+ }
+
+ DCHECK_EQ(static_cast<int>(header_size), header_stream.ByteCount());
+ }
+ // Once header is written we add it as the first buffer in the output vector.
+ buffers.insert(buffers.begin(), std::move(header_buf));
+
+ return buffers;
+}
+
+// Read internal::FlightData from arrow::Buffer containing FlightData
+// protobuf without copying
+arrow::Result<arrow::flight::internal::FlightData> DeserializeFlightData(
+ const std::shared_ptr<arrow::Buffer>& buffer) {
+ if (!buffer) {
+ return Status::Invalid("No payload");
+ }
+
+ arrow::flight::internal::FlightData out;
+
+ auto buffer_length = static_cast<int>(buffer->size());
+ CodedInputStream pb_stream(buffer->data(), buffer_length);
+
+ pb_stream.SetTotalBytesLimit(buffer_length);
+
+ // This is the bytes remaining when using CodedInputStream like this
+ while (pb_stream.BytesUntilTotalBytesLimit()) {
+ const uint32_t tag = pb_stream.ReadTag();
+ const int field_number = WireFormatLite::GetTagFieldNumber(tag);
+ switch (field_number) {
+ case pb::FlightData::kFlightDescriptorFieldNumber: {
+ pb::FlightDescriptor pb_descriptor;
+ uint32_t length;
+ if (!pb_stream.ReadVarint32(&length)) {
+ return Status::Invalid("Unable to parse length of FlightDescriptor");
+ }
+ // Can't use ParseFromCodedStream as this reads the entire
+ // rest of the stream into the descriptor command field.
+ std::string buffer;
+ if (!pb_stream.ReadString(&buffer, length)) {
+ return Status::Invalid("Unable to read FlightDescriptor from
protobuf");
+ }
+ if (!pb_descriptor.ParseFromString(buffer)) {
+ return Status::Invalid("Unable to parse FlightDescriptor");
+ }
+ arrow::flight::FlightDescriptor descriptor;
+ ARROW_RETURN_NOT_OK(
+ arrow::flight::internal::FromProto(pb_descriptor, &descriptor));
+ out.descriptor =
std::make_unique<arrow::flight::FlightDescriptor>(descriptor);
+ } break;
+ case pb::FlightData::kDataHeaderFieldNumber: {
+ if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.metadata)) {
+ return Status::Invalid("Unable to read FlightData metadata");
+ }
+ } break;
+ case pb::FlightData::kAppMetadataFieldNumber: {
+ if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.app_metadata)) {
+ return Status::Invalid("Unable to read FlightData application
metadata");
+ }
+ } break;
+ case pb::FlightData::kDataBodyFieldNumber: {
+ if (!ReadBytesZeroCopy(buffer, &pb_stream, &out.body)) {
+ return Status::Invalid("Unable to read FlightData body");
+ }
+ } break;
+ default: {
+ // Unknown field. We should skip it for compatibility.
+ if (!WireFormatLite::SkipField(&pb_stream, tag)) {
+ return Status::Invalid("Could not skip unknown field tag in
FlightData");
+ }
+ break;
+ }
+ }
+ }
+
+ // TODO(wesm): Where and when should we verify that the FlightData is not
+ // malformed?
+
+ // Set the default value for an unspecified FlightData body. The other
+ // fields can be null if they're unspecified.
+ if (out.body == nullptr) {
+ out.body = std::make_shared<Buffer>(nullptr, 0);
+ }
+
+ return out;
+}
+
} // namespace internal
} // namespace flight
} // namespace arrow
diff --git a/cpp/src/arrow/flight/serialization_internal.h
b/cpp/src/arrow/flight/serialization_internal.h
index 4d07efad81..896ca5b675 100644
--- a/cpp/src/arrow/flight/serialization_internal.h
+++ b/cpp/src/arrow/flight/serialization_internal.h
@@ -182,6 +182,15 @@ ARROW_FLIGHT_EXPORT Status ToProto(const
CloseSessionResult& result,
Status ToPayload(const FlightDescriptor& descr, std::shared_ptr<Buffer>* out);
+/// \brief Serialize a FlightPayload to a vector of buffers.
+ARROW_FLIGHT_EXPORT
+arrow::Result<arrow::BufferVector> SerializePayloadToBuffers(const
FlightPayload& msg);
+
+/// \brief Deserialize FlightData from a contiguous buffer.
+ARROW_FLIGHT_EXPORT
+arrow::Result<internal::FlightData> DeserializeFlightData(
+ const std::shared_ptr<arrow::Buffer>& buffer);
+
// We want to reuse RecordBatchStreamReader's implementation while
// (1) Adapting it to the Flight message format
// (2) Allowing pure-metadata messages before data is sent
diff --git a/cpp/src/arrow/flight/transport.h b/cpp/src/arrow/flight/transport.h
index 4ce5053402..6b96943189 100644
--- a/cpp/src/arrow/flight/transport.h
+++ b/cpp/src/arrow/flight/transport.h
@@ -76,7 +76,7 @@ class FlightStatusDetail;
namespace internal {
/// Internal, not user-visible type used for memory-efficient reads
-struct FlightData {
+struct ARROW_FLIGHT_EXPORT FlightData {
/// Used only for puts, may be null
std::unique_ptr<FlightDescriptor> descriptor;
diff --git a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
index d4848c5077..ee13a8e202 100644
--- a/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
+++ b/cpp/src/arrow/flight/transport/grpc/serialization_internal.cc
@@ -17,10 +17,7 @@
#include "arrow/flight/transport/grpc/serialization_internal.h"
-// todo cleanup includes
-
#include <cstdint>
-#include <limits>
#include <memory>
#include <string>
#include <vector>
@@ -32,10 +29,6 @@
# pragma warning(disable : 4267)
#endif
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/wire_format_lite.h>
-
#include <grpc/byte_buffer_reader.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/impl/codegen/proto_utils.h>
@@ -49,9 +42,7 @@
#include "arrow/flight/serialization_internal.h"
#include "arrow/flight/transport.h"
#include "arrow/flight/transport/grpc/util_internal.h"
-#include "arrow/ipc/message.h"
#include "arrow/ipc/writer.h"
-#include "arrow/util/bit_util.h"
#include "arrow/util/logging_internal.h"
namespace arrow {
@@ -61,28 +52,10 @@ namespace grpc {
namespace pb = arrow::flight::protocol;
-static constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
-using google::protobuf::internal::WireFormatLite;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-
using ::grpc::ByteBuffer;
namespace {
-bool ReadBytesZeroCopy(const std::shared_ptr<Buffer>& source_data,
- CodedInputStream* input, std::shared_ptr<Buffer>* out) {
- uint32_t length;
- if (!input->ReadVarint32(&length)) {
- return false;
- }
- auto buf =
- SliceBuffer(source_data, input->CurrentPosition(),
static_cast<int64_t>(length));
- *out = buf;
- return input->Skip(static_cast<int>(length));
-}
-
// Internal wrapper for gRPC ByteBuffer so its memory can be exposed to Arrow
// consumers with zero-copy
class GrpcBuffer : public MutableBuffer {
@@ -176,142 +149,29 @@ arrow::Result<::grpc::Slice> SliceFromBuffer(const
std::shared_ptr<Buffer>& buf)
return slice;
}
-const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
-
-// Update the sizes of our Protobuf fields based on the given IPC payload.
-::grpc::Status IpcMessageHeaderSize(const arrow::ipc::IpcPayload& ipc_msg,
bool has_body,
- size_t* header_size, int32_t*
metadata_size) {
- DCHECK_LE(ipc_msg.metadata->size(), kInt32Max);
- *metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
-
- // 1 byte for metadata tag
- *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size);
-
- // 2 bytes for body tag
- if (has_body) {
- // We write the body tag in the header but not the actual body data
- *header_size += 2 +
WireFormatLite::LengthDelimitedSize(ipc_msg.body_length) -
- ipc_msg.body_length;
- }
-
- return ::grpc::Status::OK;
-}
-
} // namespace
::grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
bool* own_buffer) {
- // Size of the IPC body (protobuf: data_body)
- size_t body_size = 0;
- // Size of the Protobuf "header" (everything except for the body)
- size_t header_size = 0;
- // Size of IPC header metadata (protobuf: data_header)
- int32_t metadata_size = 0;
-
- // Write the descriptor if present
- int32_t descriptor_size = 0;
- if (msg.descriptor != nullptr) {
- DCHECK_LE(msg.descriptor->size(), kInt32Max);
- descriptor_size = static_cast<int32_t>(msg.descriptor->size());
- header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size);
+ // Retrieve BufferVector from the transport-agnostic serialization.
+ auto buffers_result =
arrow::flight::internal::SerializePayloadToBuffers(msg);
+ if (!buffers_result.ok()) {
+ return ToGrpcStatus(buffers_result.status());
}
- // App metadata tag if appropriate
- int32_t app_metadata_size = 0;
- if (msg.app_metadata && msg.app_metadata->size() > 0) {
- DCHECK_LE(msg.app_metadata->size(), kInt32Max);
- app_metadata_size = static_cast<int32_t>(msg.app_metadata->size());
- header_size += 1 + WireFormatLite::LengthDelimitedSize(app_metadata_size);
- }
-
- const arrow::ipc::IpcPayload& ipc_msg = msg.ipc_message;
- // No data in this payload (metadata-only).
- bool has_ipc = ipc_msg.type != ipc::MessageType::NONE;
- bool has_body = has_ipc ? ipc::Message::HasBody(ipc_msg.type) : false;
-
- if (has_ipc) {
- DCHECK(has_body || ipc_msg.body_length == 0);
- GRPC_RETURN_NOT_GRPC_OK(
- IpcMessageHeaderSize(ipc_msg, has_body, &header_size, &metadata_size));
- body_size = static_cast<size_t>(ipc_msg.body_length);
- }
-
- // TODO(wesm): messages over 2GB unlikely to be yet supported
- // Validated in WritePayload since returning error here causes gRPC to fail
an assertion
- DCHECK_LE(body_size, kInt32Max);
-
- // Allocate and initialize slices
std::vector<::grpc::Slice> slices;
- slices.emplace_back(header_size);
-
- // Force the header_stream to be destructed, which actually flushes
- // the data into the slice.
- {
- ArrayOutputStream header_writer(const_cast<uint8_t*>(slices[0].begin()),
- static_cast<int>(slices[0].size()));
- CodedOutputStream header_stream(&header_writer);
-
- // Write descriptor
- if (msg.descriptor != nullptr) {
- WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
- header_stream.WriteVarint32(descriptor_size);
- header_stream.WriteRawMaybeAliased(msg.descriptor->data(),
-
static_cast<int>(msg.descriptor->size()));
- }
-
- // Write header
- if (has_ipc) {
- WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
- header_stream.WriteVarint32(metadata_size);
- header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
-
static_cast<int>(ipc_msg.metadata->size()));
- }
-
- // Write app metadata
- if (app_metadata_size > 0) {
- WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
- header_stream.WriteVarint32(app_metadata_size);
- header_stream.WriteRawMaybeAliased(msg.app_metadata->data(),
-
static_cast<int>(msg.app_metadata->size()));
- }
-
- if (has_body) {
- // Write body tag
- WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED,
&header_stream);
- header_stream.WriteVarint32(static_cast<uint32_t>(body_size));
-
- // Enqueue body buffers for writing, without copying
- for (const auto& buffer : ipc_msg.body_buffers) {
- // Buffer may be null when the row length is zero, or when all
- // entries are invalid.
- if (!buffer || buffer->size() == 0) continue;
-
- ::grpc::Slice slice;
- auto status = SliceFromBuffer(buffer).Value(&slice);
- if (ARROW_PREDICT_FALSE(!status.ok())) {
- // This will likely lead to abort as gRPC cannot recover from an
error here
- return ToGrpcStatus(status);
- }
- slices.push_back(std::move(slice));
-
- // Write padding if not multiple of 8
- const auto remainder = static_cast<int>(
- bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
- if (remainder) {
- slices.emplace_back(kPaddingBytes, remainder);
- }
- }
+ slices.reserve(buffers_result->size());
+ for (const auto& buffer : *buffers_result) {
+ ::grpc::Slice slice;
+ auto status = SliceFromBuffer(buffer).Value(&slice);
+ if (ARROW_PREDICT_FALSE(!status.ok())) {
+ // This will likely lead to abort as gRPC cannot recover from an error
here
+ return ToGrpcStatus(status);
}
-
- DCHECK_EQ(static_cast<int>(header_size), header_stream.ByteCount());
+ slices.push_back(std::move(slice));
}
- // Hand off the slices to the returned ByteBuffer
- *out = ::grpc::ByteBuffer(slices.data(), slices.size());
+ *out = ByteBuffer(slices.data(), slices.size());
*own_buffer = true;
return ::grpc::Status::OK;
}
@@ -324,84 +184,16 @@ const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
return {::grpc::StatusCode::INTERNAL, "No payload"};
}
- // Reset fields in case the caller reuses a single allocation
- out->descriptor = nullptr;
- out->app_metadata = nullptr;
- out->metadata = nullptr;
- out->body = nullptr;
-
std::shared_ptr<arrow::Buffer> wrapped_buffer;
GRPC_RETURN_NOT_OK(GrpcBuffer::Wrap(buffer, &wrapped_buffer));
-
- auto buffer_length = static_cast<int>(wrapped_buffer->size());
- CodedInputStream pb_stream(wrapped_buffer->data(), buffer_length);
-
- pb_stream.SetTotalBytesLimit(buffer_length);
-
- // This is the bytes remaining when using CodedInputStream like this
- while (pb_stream.BytesUntilTotalBytesLimit()) {
- const uint32_t tag = pb_stream.ReadTag();
- const int field_number = WireFormatLite::GetTagFieldNumber(tag);
- switch (field_number) {
- case pb::FlightData::kFlightDescriptorFieldNumber: {
- pb::FlightDescriptor pb_descriptor;
- uint32_t length;
- if (!pb_stream.ReadVarint32(&length)) {
- return {::grpc::StatusCode::INTERNAL,
- "Unable to parse length of FlightDescriptor"};
- }
- // Can't use ParseFromCodedStream as this reads the entire
- // rest of the stream into the descriptor command field.
- std::string buffer;
- if (!pb_stream.ReadString(&buffer, length)) {
- return {::grpc::StatusCode::INTERNAL,
- "Unable to read FlightDescriptor from protobuf"};
- }
- if (!pb_descriptor.ParseFromString(buffer)) {
- return {::grpc::StatusCode::INTERNAL, "Unable to parse
FlightDescriptor"};
- }
- arrow::flight::FlightDescriptor descriptor;
- GRPC_RETURN_NOT_OK(
- arrow::flight::internal::FromProto(pb_descriptor, &descriptor));
- out->descriptor =
std::make_unique<arrow::flight::FlightDescriptor>(descriptor);
- } break;
- case pb::FlightData::kDataHeaderFieldNumber: {
- if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) {
- return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData
metadata"};
- }
- } break;
- case pb::FlightData::kAppMetadataFieldNumber: {
- if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream,
&out->app_metadata)) {
- return {::grpc::StatusCode::INTERNAL,
- "Unable to read FlightData application metadata"};
- }
- } break;
- case pb::FlightData::kDataBodyFieldNumber: {
- if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) {
- return {::grpc::StatusCode::INTERNAL, "Unable to read FlightData
body"};
- }
- } break;
- default: {
- // Unknown field. We should skip it for compatibility.
- if (!WireFormatLite::SkipField(&pb_stream, tag)) {
- return {::grpc::StatusCode::INTERNAL,
- "Could not skip unknown field tag in FlightData"};
- }
- break;
- }
- }
- }
+ // Release gRPC memory now that Arrow Buffer holds its own reference.
buffer->Clear();
- // TODO(wesm): Where and when should we verify that the FlightData is not
- // malformed?
-
- // Set the default value for an unspecified FlightData body. The other
- // fields can be null if they're unspecified.
- if (out->body == nullptr) {
- out->body = std::make_shared<Buffer>(nullptr, 0);
+ auto result = arrow::flight::internal::DeserializeFlightData(wrapped_buffer);
+ if (!result.ok()) {
+ return ToGrpcStatus(result.status());
}
-
+ *out = result.MoveValueUnsafe();
return ::grpc::Status::OK;
}
diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc
index 8166513d4e..495425c4ae 100644
--- a/cpp/src/arrow/flight/types.cc
+++ b/cpp/src/arrow/flight/types.cc
@@ -886,6 +886,10 @@ Status FlightPayload::Validate() const {
return Status::OK();
}
+arrow::Result<BufferVector> FlightPayload::SerializeToBuffers() const {
+ return internal::SerializePayloadToBuffers(*this);
+}
+
std::string ActionType::ToString() const {
return arrow::internal::JoinToString("<ActionType type='", type, "'
description='",
description, "'>");
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
index d498ac67f7..fdd1881a63 100644
--- a/cpp/src/arrow/flight/types.h
+++ b/cpp/src/arrow/flight/types.h
@@ -904,6 +904,9 @@ struct ARROW_FLIGHT_EXPORT FlightPayload {
/// \brief Check that the payload can be written to the wire.
Status Validate() const;
+
+ /// \brief Serialize this payload to a vector of buffers.
+ arrow::Result<BufferVector> SerializeToBuffers() const;
};
// A wrapper around arrow.flight.protocol.PutResult is not defined