pitrou commented on a change in pull request #12465:
URL: https://github.com/apache/arrow/pull/12465#discussion_r823816775
##########
File path: cpp/src/arrow/flight/serialization_internal.h
##########
@@ -15,69 +15,61 @@
// specific language governing permissions and limitations
// under the License.
-// (De)serialization utilities that hook into gRPC, efficiently
-// handling Arrow-encoded data in a gRPC call.
+// Generic Flight I/O utilities.
#pragma once
-#include <memory>
-
-#include "arrow/flight/internal.h"
+#include "arrow/flight/protocol_internal.h" // IWYU pragma: keep
+#include "arrow/flight/transport.h"
#include "arrow/flight/types.h"
-#include "arrow/ipc/message.h"
-#include "arrow/result.h"
+#include "arrow/util/macros.h"
namespace arrow {
-class Buffer;
+class Schema;
+class Status;
+
+namespace ipc {
+class Message;
+} // namespace ipc
namespace flight {
+namespace pb = arrow::flight::protocol;
namespace internal {
-/// Internal, not user-visible type used for memory-efficient reads from gRPC
-/// stream
-struct FlightData {
- /// Used only for puts, may be null
- std::unique_ptr<FlightDescriptor> descriptor;
-
- /// Non-length-prefixed Message header as described in format/Message.fbs
- std::shared_ptr<Buffer> metadata;
-
- /// Application-defined metadata
- std::shared_ptr<Buffer> app_metadata;
-
- /// Message body
- std::shared_ptr<Buffer> body;
-
- /// Open IPC message from the metadata and body
- ::arrow::Result<std::unique_ptr<ipc::Message>> OpenMessage();
-};
-
-/// Write Flight message on gRPC stream with zero-copy optimizations.
-// Returns Invalid if the payload is ill-formed
-// Returns IOError if gRPC did not write the message (note this is not
-// necessarily an error - the client may simply have gone away)
-Status WritePayload(const FlightPayload& payload,
- grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>*
writer);
-Status WritePayload(const FlightPayload& payload,
- grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>*
writer);
-Status WritePayload(const FlightPayload& payload,
- grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>*
writer);
-Status WritePayload(const FlightPayload& payload,
- grpc::ServerWriter<pb::FlightData>* writer);
-
-/// Read Flight message from gRPC stream with zero-copy optimizations.
-/// True is returned on success, false if stream ended.
-bool ReadPayload(grpc::ClientReader<pb::FlightData>* reader, FlightData* data);
-bool ReadPayload(grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>*
reader,
- FlightData* data);
-bool ReadPayload(grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>*
reader,
- FlightData* data);
-bool ReadPayload(grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>*
reader,
- FlightData* data);
-// Overload to make genericity easier in DoPutPayloadWriter
-bool ReadPayload(grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>*
reader,
- pb::PutResult* data);
+/// \brief The header used for transmitting authentication/authorization data.
+static constexpr char kAuthHeader[] = "authorization";
+
+ARROW_FLIGHT_EXPORT
+Status SchemaToString(const Schema& schema, std::string* out);
+
+// These functions depend on protobuf types which are not exported in the
Flight DLL.
+
+Status FromProto(const pb::ActionType& pb_type, ActionType* type);
+Status FromProto(const pb::Action& pb_action, Action* action);
+Status FromProto(const pb::Result& pb_result, Result* result);
+Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria);
+Status FromProto(const pb::Location& pb_location, Location* location);
+Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket);
+Status FromProto(const pb::FlightData& pb_data, FlightDescriptor* descriptor,
+ std::unique_ptr<ipc::Message>* message);
+Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor*
descr);
+Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint*
endpoint);
+Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info);
+Status FromProto(const pb::SchemaResult& pb_result, std::string* result);
+Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* info);
+
+Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
+Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
+Status ToProto(const ActionType& type, pb::ActionType* pb_type);
+Status ToProto(const Action& action, pb::Action* pb_action);
+Status ToProto(const Result& result, pb::Result* pb_result);
+Status ToProto(const Criteria& criteria, pb::Criteria* pb_criteria);
+Status ToProto(const SchemaResult& result, pb::SchemaResult* pb_result);
+void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket);
Review comment:
Why does this one specifically return `void`?
##########
File path: cpp/src/arrow/flight/transport/grpc/util_internal.cc
##########
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/transport/grpc/util_internal.h"
+
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#ifdef GRPCPP_PP_INCLUDE
+#include <grpcpp/grpcpp.h>
+#else
+#include <grpc++/grpc++.h>
+#endif
+
+#include "arrow/flight/types.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace flight {
+namespace transport {
+namespace grpc {
+
+const char* kGrpcAuthHeader = "auth-token-bin";
+const char* kGrpcStatusCodeHeader = "x-arrow-status";
+const char* kGrpcStatusMessageHeader = "x-arrow-status-message-bin";
+const char* kGrpcStatusDetailHeader = "x-arrow-status-detail-bin";
+const char* kBinaryErrorDetailsKey = "grpc-status-details-bin";
+
+static Status StatusCodeFromString(const ::grpc::string_ref& code_ref,
StatusCode* code) {
+ // Bounce through std::string to get a proper null-terminated C string
+ const auto code_int = std::atoi(std::string(code_ref.data(),
code_ref.size()).c_str());
+ switch (code_int) {
+ case static_cast<int>(StatusCode::OutOfMemory):
+ case static_cast<int>(StatusCode::KeyError):
+ case static_cast<int>(StatusCode::TypeError):
+ case static_cast<int>(StatusCode::Invalid):
+ case static_cast<int>(StatusCode::IOError):
+ case static_cast<int>(StatusCode::CapacityError):
+ case static_cast<int>(StatusCode::IndexError):
+ case static_cast<int>(StatusCode::UnknownError):
+ case static_cast<int>(StatusCode::NotImplemented):
+ case static_cast<int>(StatusCode::SerializationError):
+ case static_cast<int>(StatusCode::RError):
+ case static_cast<int>(StatusCode::CodeGenError):
+ case static_cast<int>(StatusCode::ExpressionValidationError):
+ case static_cast<int>(StatusCode::ExecutionError):
+ case static_cast<int>(StatusCode::AlreadyExists): {
+ *code = static_cast<StatusCode>(code_int);
+ return Status::OK();
+ }
+ default:
+ // Code is invalid
+ return Status::UnknownError("Unknown Arrow status code", code_ref);
+ }
+}
+
+/// Try to extract a status from gRPC trailers.
+/// Return Status::OK if found, an error otherwise.
+static Status FromGrpcContext(const ::grpc::ClientContext& ctx, Status* status,
+ std::shared_ptr<FlightStatusDetail>
flightStatusDetail) {
Review comment:
`flight_status_detail` perhaps
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]