This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b73ddc3336 GH-34852: [C++][Go][Java][FlightRPC] Add support for
ordered data (#35178)
b73ddc3336 is described below
commit b73ddc3336f9834ae61c14a285e8fd243e979222
Author: Sutou Kouhei <[email protected]>
AuthorDate: Tue May 9 10:03:03 2023 +0900
GH-34852: [C++][Go][Java][FlightRPC] Add support for ordered data (#35178)
### Rationale for this change
No ordering is unnecessarily limiting. Systems can and do implement
distributed sorts, but they can’t reflect this in Flight RPC.
### What changes are included in this PR?
These changes add `FlightInfo.ordered`.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
**This PR includes breaking changes to public APIs.**
* Closes: #34852
* Closes: #35085
Lead-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: David Li <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/flight/flight_internals_test.cc | 21 +-
.../integration_tests/flight_integration_test.cc | 2 +
.../flight/integration_tests/test_integration.cc | 156 ++++++++++++-
cpp/src/arrow/flight/perf_server.cc | 2 +-
cpp/src/arrow/flight/serialization_internal.cc | 2 +
cpp/src/arrow/flight/sql/example/acero_server.cc | 7 +-
cpp/src/arrow/flight/sql/example/sqlite_server.cc | 11 +-
cpp/src/arrow/flight/sql/server.cc | 5 +-
cpp/src/arrow/flight/test_util.cc | 12 +-
cpp/src/arrow/flight/test_util.h | 2 +-
cpp/src/arrow/flight/types.cc | 8 +-
cpp/src/arrow/flight/types.h | 7 +-
dev/archery/archery/integration/runner.py | 5 +
docs/source/format/Flight.rst | 24 +-
format/Flight.proto | 22 +-
go/arrow/flight/internal/flight/Flight.pb.go | 248 ++++++++++-----------
go/arrow/flight/internal/flight/Flight_grpc.pb.go | 46 +---
go/arrow/flight/server.go | 7 +-
go/arrow/internal/flight_integration/scenario.go | 164 ++++++++++++++
.../java/org/apache/arrow/flight/FlightInfo.java | 32 ++-
.../apache/arrow/flight/TestBasicOperation.java | 15 ++
.../flight/integration/tests/OrderedScenario.java | 162 ++++++++++++++
.../arrow/flight/integration/tests/Scenarios.java | 1 +
.../flight/integration/tests/IntegrationTest.java | 5 +
24 files changed, 751 insertions(+), 215 deletions(-)
diff --git a/cpp/src/arrow/flight/flight_internals_test.cc
b/cpp/src/arrow/flight/flight_internals_test.cc
index 3fef31b0ea..187f1207bf 100644
--- a/cpp/src/arrow/flight/flight_internals_test.cc
+++ b/cpp/src/arrow/flight/flight_internals_test.cc
@@ -213,26 +213,27 @@ TEST(FlightTypes, FlightInfo) {
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}};
std::vector<FlightInfo> values = {
- MakeFlightInfo(schema1, desc1, {}, -1, -1),
- MakeFlightInfo(schema1, desc2, {}, -1, -1),
- MakeFlightInfo(schema2, desc1, {}, -1, -1),
- MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42),
- MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1),
+ MakeFlightInfo(schema1, desc1, {}, -1, -1, false),
+ MakeFlightInfo(schema1, desc2, {}, -1, -1, true),
+ MakeFlightInfo(schema2, desc1, {}, -1, -1, false),
+ MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true),
+ MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false),
};
std::vector<std::string> reprs = {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'>
"
- "endpoints=[] total_records=-1 total_bytes=-1>",
+ "endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'>
"
- "endpoints=[] total_records=-1 total_bytes=-1>",
+ "endpoints=[] total_records=-1 total_bytes=-1 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'>
"
- "endpoints=[] total_records=-1 total_bytes=-1>",
+ "endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'>
"
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>] "
- "total_records=-1 total_bytes=42>",
+ "total_records=-1 total_bytes=42 ordered=true>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'>
"
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[]>, "
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
- "[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1>",
+ "[grpc+tcp://localhost:1234]>] total_records=64 total_bytes=-1 "
+ "ordered=false>",
};
ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
diff --git a/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc
b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc
index e29a281f32..2e5fefb853 100644
--- a/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc
+++ b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc
@@ -53,6 +53,8 @@ TEST(FlightIntegration, AuthBasicProto) {
ASSERT_OK(RunScenario("auth:basic_prot
TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }
+TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); }
+
TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }
TEST(FlightIntegration, FlightSqlExtension) {
diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc
b/cpp/src/arrow/flight/integration_tests/test_integration.cc
index 9a300d1bd2..fa92a53cd3 100644
--- a/cpp/src/arrow/flight/integration_tests/test_integration.cc
+++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc
@@ -27,6 +27,7 @@
#include "arrow/array/array_binary.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
+#include "arrow/array/builder_primitive.h"
#include "arrow/flight/client_middleware.h"
#include "arrow/flight/server_middleware.h"
#include "arrow/flight/sql/client.h"
@@ -37,6 +38,8 @@
#include "arrow/flight/types.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/table_builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/checked_cast.h"
@@ -210,8 +213,8 @@ class MiddlewareServer : public FlightServerBase {
// Return a fake location - the test doesn't read it
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost",
10010));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"},
{location}}};
- ARROW_ASSIGN_OR_RAISE(auto info,
- FlightInfo::Make(*schema, descriptor, endpoints,
-1, -1));
+ ARROW_ASSIGN_OR_RAISE(
+ auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1,
false));
*result = std::make_unique<FlightInfo>(info);
return Status::OK();
}
@@ -271,6 +274,142 @@ class MiddlewareScenario : public Scenario {
std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
};
+/// \brief The server used for testing FlightInfo.ordered.
+///
+/// If the given command is "ordered", the server sets
+/// FlightInfo.ordered. The client that supports FlightInfo.ordered
+/// must read data from endpoints from front to back. The client that
+/// doesn't support FlightInfo.ordered may read data from endpoints in
+/// random order.
+///
+/// This scenario is passed only when the client supports
+/// FlightInfo.ordered.
+class OrderedServer : public FlightServerBase {
+ Status GetFlightInfo(const ServerCallContext& context,
+ const FlightDescriptor& descriptor,
+ std::unique_ptr<FlightInfo>* result) override {
+ const auto ordered = (descriptor.type ==
FlightDescriptor::DescriptorType::CMD &&
+ descriptor.cmd == "ordered");
+ auto schema = BuildSchema();
+ std::vector<FlightEndpoint> endpoints;
+ if (ordered) {
+ endpoints.push_back(FlightEndpoint{{"1"}, {}});
+ endpoints.push_back(FlightEndpoint{{"2"}, {}});
+ endpoints.push_back(FlightEndpoint{{"3"}, {}});
+ } else {
+ endpoints.push_back(FlightEndpoint{{"1"}, {}});
+ endpoints.push_back(FlightEndpoint{{"3"}, {}});
+ endpoints.push_back(FlightEndpoint{{"2"}, {}});
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1,
ordered));
+ *result = std::make_unique<FlightInfo>(info);
+ return Status::OK();
+ }
+
+ Status DoGet(const ServerCallContext& context, const Ticket& request,
+ std::unique_ptr<FlightDataStream>* stream) override {
+ ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
+ BuildSchema(),
arrow::default_memory_pool()));
+ auto number_builder = builder->GetFieldAs<Int32Builder>(0);
+ if (request.ticket == "1") {
+ ARROW_RETURN_NOT_OK(number_builder->Append(1));
+ ARROW_RETURN_NOT_OK(number_builder->Append(2));
+ ARROW_RETURN_NOT_OK(number_builder->Append(3));
+ } else if (request.ticket == "2") {
+ ARROW_RETURN_NOT_OK(number_builder->Append(10));
+ ARROW_RETURN_NOT_OK(number_builder->Append(20));
+ ARROW_RETURN_NOT_OK(number_builder->Append(30));
+ } else if (request.ticket == "3") {
+ ARROW_RETURN_NOT_OK(number_builder->Append(100));
+ ARROW_RETURN_NOT_OK(number_builder->Append(200));
+ ARROW_RETURN_NOT_OK(number_builder->Append(300));
+ } else {
+ return Status::KeyError("Could not find flight: ", request.ticket);
+ }
+ ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+ std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
+ ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
+ RecordBatchReader::Make(record_batches));
+ *stream = std::make_unique<RecordBatchStream>(record_batch_reader);
+ return Status::OK();
+ }
+
+ private:
+ std::shared_ptr<Schema> BuildSchema() {
+ return arrow::schema({arrow::field("number", arrow::int32(), false)});
+ }
+};
+
+/// \brief The ordered scenario.
+///
+/// This tests that the server and client get expected header values.
+class OrderedScenario : public Scenario {
+ Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+ FlightServerOptions* options) override {
+ server->reset(new OrderedServer());
+ return Status::OK();
+ }
+
+ Status MakeClient(FlightClientOptions* options) override { return
Status::OK(); }
+
+ Status RunClient(std::unique_ptr<FlightClient> client) override {
+ ARROW_ASSIGN_OR_RAISE(auto info,
+
client->GetFlightInfo(FlightDescriptor::Command("ordered")));
+ if (!info->ordered()) {
+ return Status::Invalid("Server must return FlightInfo.ordered = true");
+ }
+ std::vector<std::shared_ptr<arrow::Table>> tables;
+ for (const auto& endpoint : info->endpoints()) {
+ if (!endpoint.locations.empty()) {
+ std::stringstream ss;
+ ss << "[";
+ for (const auto& location : endpoint.locations) {
+ if (ss.str().size() != 1) {
+ ss << ", ";
+ }
+ ss << location.ToString();
+ }
+ ss << "]";
+ return Status::Invalid(
+ "Expected to receive empty locations to use the original service:
",
+ ss.str());
+ }
+ ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(endpoint.ticket));
+ ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
+ tables.push_back(table);
+ }
+ ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));
+
+ // Build expected table
+ auto schema = arrow::schema({arrow::field("number", arrow::int32(),
false)});
+ ARROW_ASSIGN_OR_RAISE(auto builder,
+ RecordBatchBuilder::Make(schema,
arrow::default_memory_pool()));
+ auto number_builder = builder->GetFieldAs<Int32Builder>(0);
+ ARROW_RETURN_NOT_OK(number_builder->Append(1));
+ ARROW_RETURN_NOT_OK(number_builder->Append(2));
+ ARROW_RETURN_NOT_OK(number_builder->Append(3));
+ ARROW_RETURN_NOT_OK(number_builder->Append(10));
+ ARROW_RETURN_NOT_OK(number_builder->Append(20));
+ ARROW_RETURN_NOT_OK(number_builder->Append(30));
+ ARROW_RETURN_NOT_OK(number_builder->Append(100));
+ ARROW_RETURN_NOT_OK(number_builder->Append(200));
+ ARROW_RETURN_NOT_OK(number_builder->Append(300));
+ ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
+ std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
+ expected_record_batch};
+ ARROW_ASSIGN_OR_RAISE(auto expected_table,
+ Table::FromRecordBatches(expected_record_batches));
+
+ // Check read data
+ if (!table->Equals(*expected_table)) {
+ return Status::Invalid("Read data isn't expected\n", "Expected:\n",
+ expected_table->ToString(), "Actual:\n",
table->ToString());
+ }
+ return Status::OK();
+ }
+};
+
/// \brief Schema to be returned for mocking the statement/prepared statement
results.
///
/// Must be the same across all languages.
@@ -382,8 +521,8 @@ class FlightSqlScenarioServer : public
sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle,
sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
- ARROW_ASSIGN_OR_RAISE(auto result,
- FlightInfo::Make(*schema, descriptor, endpoints, -1,
-1));
+ ARROW_ASSIGN_OR_RAISE(
+ auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1,
false));
return std::make_unique<FlightInfo>(result);
}
@@ -407,8 +546,8 @@ class FlightSqlScenarioServer : public
sql::FlightSqlServerBase {
}
ARROW_ASSIGN_OR_RAISE(auto handle,
sql::CreateStatementQueryTicket(ticket));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
- ARROW_ASSIGN_OR_RAISE(auto result,
- FlightInfo::Make(*schema, descriptor, endpoints, -1,
-1));
+ ARROW_ASSIGN_OR_RAISE(
+ auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1,
false));
return std::make_unique<FlightInfo>(result);
}
@@ -851,7 +990,7 @@ class FlightSqlScenarioServer : public
sql::FlightSqlServerBase {
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>&
schema) {
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd},
{}}};
ARROW_ASSIGN_OR_RAISE(auto result,
- FlightInfo::Make(*schema, descriptor, endpoints, -1,
-1))
+ FlightInfo::Make(*schema, descriptor, endpoints, -1,
-1, false))
return std::make_unique<FlightInfo>(result);
}
@@ -1330,6 +1469,9 @@ Status GetScenario(const std::string& scenario_name,
std::shared_ptr<Scenario>*
} else if (scenario_name == "middleware") {
*out = std::make_shared<MiddlewareScenario>();
return Status::OK();
+ } else if (scenario_name == "ordered") {
+ *out = std::make_shared<OrderedScenario>();
+ return Status::OK();
} else if (scenario_name == "flight_sql") {
*out = std::make_shared<FlightSqlScenario>();
return Status::OK();
diff --git a/cpp/src/arrow/flight/perf_server.cc
b/cpp/src/arrow/flight/perf_server.cc
index db3e8b150e..7e7882955e 100644
--- a/cpp/src/arrow/flight/perf_server.cc
+++ b/cpp/src/arrow/flight/perf_server.cc
@@ -196,7 +196,7 @@ class FlightPerfServer : public FlightServerBase {
perf_request.stream_count() * perf_request.records_per_stream();
*info = std::make_unique<FlightInfo>(
- MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1));
+ MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1,
false));
return Status::OK();
}
diff --git a/cpp/src/arrow/flight/serialization_internal.cc
b/cpp/src/arrow/flight/serialization_internal.cc
index fa21a934bd..dae547c893 100644
--- a/cpp/src/arrow/flight/serialization_internal.cc
+++ b/cpp/src/arrow/flight/serialization_internal.cc
@@ -196,6 +196,7 @@ Status FromProto(const pb::FlightInfo& pb_info,
FlightInfo::Data* info) {
info->total_records = pb_info.total_records();
info->total_bytes = pb_info.total_bytes();
+ info->ordered = pb_info.ordered();
return Status::OK();
}
@@ -236,6 +237,7 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo*
pb_info) {
pb_info->set_total_records(info.total_records());
pb_info->set_total_bytes(info.total_bytes());
+ pb_info->set_ordered(info.ordered());
return Status::OK();
}
diff --git a/cpp/src/arrow/flight/sql/example/acero_server.cc
b/cpp/src/arrow/flight/sql/example/acero_server.cc
index 2f1e48b0be..7c1d9f867a 100644
--- a/cpp/src/arrow/flight/sql/example/acero_server.cc
+++ b/cpp/src/arrow/flight/sql/example/acero_server.cc
@@ -167,9 +167,10 @@ class AceroFlightSqlServer : public FlightSqlServerBase {
ARROW_ASSIGN_OR_RAISE(auto ticket, CreateStatementQueryTicket(plan));
std::vector<FlightEndpoint> endpoints{
FlightEndpoint{Ticket{std::move(ticket)}, /*locations=*/{}}};
- ARROW_ASSIGN_OR_RAISE(auto info,
- FlightInfo::Make(schema, descriptor,
std::move(endpoints),
- /*total_records=*/-1,
/*total_bytes=*/-1));
+ ARROW_ASSIGN_OR_RAISE(
+ auto info,
+ FlightInfo::Make(schema, descriptor, std::move(endpoints),
+ /*total_records=*/-1, /*total_bytes=*/-1,
/*ordered=*/false));
return std::make_unique<FlightInfo>(std::move(info));
}
diff --git a/cpp/src/arrow/flight/sql/example/sqlite_server.cc
b/cpp/src/arrow/flight/sql/example/sqlite_server.cc
index 9a80dd56f1..dccbfe3beb 100644
--- a/cpp/src/arrow/flight/sql/example/sqlite_server.cc
+++ b/cpp/src/arrow/flight/sql/example/sqlite_server.cc
@@ -127,7 +127,7 @@ arrow::Result<std::unique_ptr<FlightInfo>>
GetFlightInfoForCommand(
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema)
{
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
ARROW_ASSIGN_OR_RAISE(auto result,
- FlightInfo::Make(*schema, descriptor, endpoints, -1,
-1))
+ FlightInfo::Make(*schema, descriptor, endpoints, -1,
-1, false))
return std::make_unique<FlightInfo>(result);
}
@@ -304,8 +304,11 @@ class SQLiteFlightSqlServer::Impl {
ARROW_ASSIGN_OR_RAISE(auto ticket,
EncodeTransactionQuery(query,
command.transaction_id));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{std::move(ticket),
{}}};
- ARROW_ASSIGN_OR_RAISE(auto result,
- FlightInfo::Make(*schema, descriptor, endpoints, -1,
-1))
+ // TODO: Set true only when "ORDER BY" is used in a main "SELECT"
+ // in the given query.
+ const bool ordered = false;
+ ARROW_ASSIGN_OR_RAISE(
+ auto result, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1,
ordered));
return std::make_unique<FlightInfo>(result);
}
@@ -392,7 +395,7 @@ class SQLiteFlightSqlServer::Impl {
auto result,
FlightInfo::Make(include_schema ?
*SqlSchema::GetTablesSchemaWithIncludedSchema()
: *SqlSchema::GetTablesSchema(),
- descriptor, endpoints, -1, -1))
+ descriptor, endpoints, -1, -1, false))
return std::make_unique<FlightInfo>(std::move(result));
}
diff --git a/cpp/src/arrow/flight/sql/server.cc
b/cpp/src/arrow/flight/sql/server.cc
index 7f6d9b75a8..7621711308 100644
--- a/cpp/src/arrow/flight/sql/server.cc
+++ b/cpp/src/arrow/flight/sql/server.cc
@@ -890,8 +890,9 @@ arrow::Result<std::unique_ptr<FlightInfo>>
FlightSqlServerBase::GetFlightInfoSql
}
std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
- ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(),
- descriptor, endpoints,
-1, -1))
+ ARROW_ASSIGN_OR_RAISE(
+ auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(),
descriptor, endpoints,
+ -1, -1, false))
return std::make_unique<FlightInfo>(result);
}
diff --git a/cpp/src/arrow/flight/test_util.cc
b/cpp/src/arrow/flight/test_util.cc
index 92496d9eae..81db0ff1aa 100644
--- a/cpp/src/arrow/flight/test_util.cc
+++ b/cpp/src/arrow/flight/test_util.cc
@@ -531,9 +531,9 @@ std::unique_ptr<FlightServerBase> ExampleTestServer() {
FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor&
descriptor,
const std::vector<FlightEndpoint>& endpoints,
- int64_t total_records, int64_t total_bytes) {
+ int64_t total_records, int64_t total_bytes, bool
ordered) {
EXPECT_OK_AND_ASSIGN(auto info, FlightInfo::Make(schema, descriptor,
endpoints,
- total_records,
total_bytes));
+ total_records, total_bytes,
ordered));
return info;
}
@@ -619,10 +619,10 @@ std::vector<FlightInfo> ExampleFlightInfo() {
auto schema4 = ExampleFloatSchema();
return {
- MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000),
- MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000),
- MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1),
- MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000),
+ MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000,
false),
+ MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, false),
+ MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1, false),
+ MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000, false),
};
}
diff --git a/cpp/src/arrow/flight/test_util.h b/cpp/src/arrow/flight/test_util.h
index 96d2e2e341..f299d358c5 100644
--- a/cpp/src/arrow/flight/test_util.h
+++ b/cpp/src/arrow/flight/test_util.h
@@ -192,7 +192,7 @@ std::vector<ActionType> ExampleActionTypes();
ARROW_FLIGHT_EXPORT
FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor&
descriptor,
const std::vector<FlightEndpoint>& endpoints,
- int64_t total_records, int64_t total_bytes);
+ int64_t total_records, int64_t total_bytes, bool
ordered);
// ----------------------------------------------------------------------
// A pair of authentication handlers that check for a predefined password
diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc
index b051ec7081..806e616fda 100644
--- a/cpp/src/arrow/flight/types.cc
+++ b/cpp/src/arrow/flight/types.cc
@@ -275,12 +275,14 @@ Status Ticket::Deserialize(const std::string& serialized,
Ticket* out) {
arrow::Result<FlightInfo> FlightInfo::Make(const Schema& schema,
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>&
endpoints,
- int64_t total_records, int64_t
total_bytes) {
+ int64_t total_records, int64_t
total_bytes,
+ bool ordered) {
FlightInfo::Data data;
data.descriptor = descriptor;
data.endpoints = endpoints;
data.total_records = total_records;
data.total_bytes = total_bytes;
+ data.ordered = ordered;
RETURN_NOT_OK(internal::SchemaToString(schema, &data.schema));
return FlightInfo(data);
}
@@ -355,6 +357,7 @@ std::string FlightInfo::ToString() const {
}
ss << "] total_records=" << data_.total_records;
ss << " total_bytes=" << data_.total_bytes;
+ ss << " ordered=" << (data_.ordered ? "true" : "false");
ss << '>';
return ss.str();
}
@@ -364,7 +367,8 @@ bool FlightInfo::Equals(const FlightInfo& other) const {
data_.descriptor == other.data_.descriptor &&
data_.endpoints == other.data_.endpoints &&
data_.total_records == other.data_.total_records &&
- data_.total_bytes == other.data_.total_bytes;
+ data_.total_bytes == other.data_.total_bytes &&
+ data_.ordered == other.data_.ordered;
}
Location::Location() { uri_ = std::make_shared<arrow::internal::Uri>(); }
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
index 9d92f0be95..09edc75a85 100644
--- a/cpp/src/arrow/flight/types.h
+++ b/cpp/src/arrow/flight/types.h
@@ -522,6 +522,7 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
std::vector<FlightEndpoint> endpoints;
int64_t total_records;
int64_t total_bytes;
+ bool ordered;
};
explicit FlightInfo(Data data) : data_(std::move(data)),
reconstructed_schema_(false) {}
@@ -530,7 +531,8 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
static arrow::Result<FlightInfo> Make(const Schema& schema,
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>&
endpoints,
- int64_t total_records, int64_t
total_bytes);
+ int64_t total_records, int64_t
total_bytes,
+ bool ordered = false);
/// \brief Deserialize the Arrow schema of the dataset. Populate any
/// dictionary encoded fields into a DictionaryMemo for
@@ -560,6 +562,9 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
/// The total number of bytes in the dataset. If unknown, set to -1
int64_t total_bytes() const { return data_.total_bytes; }
+ /// Whether endpoints are in the same order as the data.
+ bool ordered() const { return data_.ordered; }
+
/// \brief Get the wire-format representation of this type.
///
/// Useful when interoperating with non-Flight systems (e.g. REST
diff --git a/dev/archery/archery/integration/runner.py
b/dev/archery/archery/integration/runner.py
index 7d26b7a51c..b733d32916 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -430,6 +430,11 @@ def run_all_tests(with_cpp=True, with_java=True,
with_js=True,
"middleware",
description="Ensure headers are propagated via middleware.",
),
+ Scenario(
+ "ordered",
+ description="Ensure FlightInfo.ordered is supported.",
+ skip={"JS", "C#", "Rust"},
+ ),
Scenario(
"flight_sql",
description="Ensure Flight SQL protocol is working as expected.",
diff --git a/docs/source/format/Flight.rst b/docs/source/format/Flight.rst
index c7cfcea277..c21f13b1f9 100644
--- a/docs/source/format/Flight.rst
+++ b/docs/source/format/Flight.rst
@@ -90,9 +90,21 @@ A client that wishes to download the data would:
An endpoint contains a list of locations (server addresses) where
this data can be retrieved from, and a ``Ticket``, an opaque binary
token that the server will use to identify the data being
- requested. There is no ordering defined on endpoints or the data
- within, so if the dataset is sorted, applications should return
- data in a single endpoint.
+ requested.
+
+ If ``FlightInfo.ordered`` is true, this signals there is some order
+ between data from different endpoints. Clients should produce the
+ same results as if the data returned from each of the endpoints was
+ concatenated, in order, from front to back.
+
+ If ``FlightInfo.ordered`` is false, the client may return data
+ from any of the endpoints in arbitrary order. Data from any
+ specific endpoint must be returned in order, but the data from
+ different endpoints may be interleaved to allow parallel fetches.
+
+ Note that since some clients may ignore ``FlightInfo.ordered``, if
+ ordering is important and client support can not be ensured,
+ servers should return a single endpoint.
The response also contains other metadata, like the schema, and
optionally an estimate of the dataset size.
@@ -117,7 +129,9 @@ A client that wishes to download the data would:
The client must consume all endpoints to retrieve the complete data
set. The client can consume endpoints in any order, or even in
parallel, or distribute the endpoints among multiple machines for
- consumption; this is up to the application to implement.
+ consumption; this is up to the application to implement. The client
+ can also use ``FlightInfo.ordered``. See the previous item for
+ details of ``FlightInfo.ordered``.
Uploading Data
--------------
@@ -216,7 +230,7 @@ Flight is primarily defined in terms of its Protobuf and
gRPC
specification below, but Arrow implementations may also support
alternative transports (see :ref:`status-flight-rpc`). In that case,
implementations should use the following URI schemes for the given
-transport implemenatations:
+transport implementations:
+----------------------------+----------------------------+
| Transport | URI Scheme |
diff --git a/format/Flight.proto b/format/Flight.proto
index 635b1793d2..8d1187976c 100644
--- a/format/Flight.proto
+++ b/format/Flight.proto
@@ -266,14 +266,32 @@ message FlightInfo {
* In other words, an application can use multiple endpoints to
* represent partitioned data.
*
- * There is no ordering defined on endpoints. Hence, if the returned
- * data has an ordering, it should be returned in a single endpoint.
+ * If the returned data has an ordering, an application can use
+ * "FlightInfo.ordered = true" or should return the all data in a
+ * single endpoint. Otherwise, there is no ordering defined on
+ * endpoints or the data within.
+ *
+ * A client can read ordered data by reading data from returned
+ * endpoints, in order, from front to back.
+ *
+ * Note that a client may ignore "FlightInfo.ordered = true". If an
+ * ordering is important for an application, an application must
+ * choose one of them:
+ *
+ * * An application requires that all clients must read data in
+ * returned endpoints order.
+ * * An application must return the all data in a single endpoint.
*/
repeated FlightEndpoint endpoint = 3;
// Set these to -1 if unknown.
int64 total_records = 4;
int64 total_bytes = 5;
+
+ /*
+ * FlightEndpoints are in the same order as the data.
+ */
+ bool ordered = 6;
}
/*
diff --git a/go/arrow/flight/internal/flight/Flight.pb.go
b/go/arrow/flight/internal/flight/Flight.pb.go
index b7be492acd..c0f4ad9e85 100644
--- a/go/arrow/flight/internal/flight/Flight.pb.go
+++ b/go/arrow/flight/internal/flight/Flight.pb.go
@@ -18,7 +18,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
-// protoc v3.12.4
+// protoc v3.21.12
// source: Flight.proto
package flight
@@ -37,19 +37,17 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-//
// Describes what type of descriptor is defined.
type FlightDescriptor_DescriptorType int32
const (
// Protobuf pattern, not used.
FlightDescriptor_UNKNOWN FlightDescriptor_DescriptorType = 0
- //
// A named path that identifies a dataset. A path is composed of a
string
// or list of strings describing a particular dataset. This is
conceptually
- // similar to a path inside a filesystem.
- FlightDescriptor_PATH FlightDescriptor_DescriptorType = 1
//
+ // similar to a path inside a filesystem.
+ FlightDescriptor_PATH FlightDescriptor_DescriptorType = 1
// An opaque command to generate a dataset.
FlightDescriptor_CMD FlightDescriptor_DescriptorType = 2
)
@@ -95,17 +93,14 @@ func (FlightDescriptor_DescriptorType) EnumDescriptor()
([]byte, []int) {
return file_Flight_proto_rawDescGZIP(), []int{9, 0}
}
-//
// The request that a client provides to a server on handshake.
type HandshakeRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- //
// A defined protocol version
ProtocolVersion uint64
`protobuf:"varint,1,opt,name=protocol_version,json=protocolVersion,proto3"
json:"protocol_version,omitempty"`
- //
// Arbitrary auth/handshake info.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3"
json:"payload,omitempty"`
}
@@ -161,10 +156,8 @@ type HandshakeResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- //
// A defined protocol version
ProtocolVersion uint64
`protobuf:"varint,1,opt,name=protocol_version,json=protocolVersion,proto3"
json:"protocol_version,omitempty"`
- //
// Arbitrary auth/handshake info.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3"
json:"payload,omitempty"`
}
@@ -215,7 +208,6 @@ func (x *HandshakeResponse) GetPayload() []byte {
return nil
}
-//
// A message for doing simple auth.
type BasicAuth struct {
state protoimpl.MessageState
@@ -310,7 +302,6 @@ func (*Empty) Descriptor() ([]byte, []int) {
return file_Flight_proto_rawDescGZIP(), []int{3}
}
-//
// Describes an available action, including both the name used for execution
// along with a short description of the purpose of the action.
type ActionType struct {
@@ -368,7 +359,6 @@ func (x *ActionType) GetDescription() string {
return ""
}
-//
// A service specific expression that can be used to return a limited set
// of available Arrow Flight streams.
type Criteria struct {
@@ -418,7 +408,6 @@ func (x *Criteria) GetExpression() []byte {
return nil
}
-//
// An opaque action specific for the service.
type Action struct {
state protoimpl.MessageState
@@ -475,7 +464,6 @@ func (x *Action) GetBody() []byte {
return nil
}
-//
// An opaque result returned after executing an action.
type Result struct {
state protoimpl.MessageState
@@ -524,7 +512,6 @@ func (x *Result) GetBody() []byte {
return nil
}
-//
// Wrap the result of a getSchema call
type SchemaResult struct {
state protoimpl.MessageState
@@ -532,9 +519,10 @@ type SchemaResult struct {
unknownFields protoimpl.UnknownFields
// The schema of the dataset in its IPC form:
- // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
- // 4 bytes - the byte length of the payload
- // a flatbuffer Message whose header is the Schema
+ //
+ // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
+ // 4 bytes - the byte length of the payload
+ // a flatbuffer Message whose header is the Schema
Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3"
json:"schema,omitempty"`
}
@@ -577,7 +565,6 @@ func (x *SchemaResult) GetSchema() []byte {
return nil
}
-//
// The name or tag for a Flight. May be used as a way to retrieve or generate
// a flight or be used to expose a set of previously defined flights.
type FlightDescriptor struct {
@@ -586,11 +573,9 @@ type FlightDescriptor struct {
unknownFields protoimpl.UnknownFields
Type FlightDescriptor_DescriptorType
`protobuf:"varint,1,opt,name=type,proto3,enum=arrow.flight.protocol.FlightDescriptor_DescriptorType"
json:"type,omitempty"`
- //
// Opaque value used to express a command. Should only be defined when
// type = CMD.
Cmd []byte `protobuf:"bytes,2,opt,name=cmd,proto3" json:"cmd,omitempty"`
- //
// List of strings identifying a particular dataset. Should only be
defined
// when type = PATH.
Path []string `protobuf:"bytes,3,rep,name=path,proto3"
json:"path,omitempty"`
@@ -649,7 +634,6 @@ func (x *FlightDescriptor) GetPath() []string {
return nil
}
-//
// The access coordinates for retrieval of a dataset. With a FlightInfo, a
// consumer is able to determine how to retrieve a dataset.
type FlightInfo struct {
@@ -658,14 +642,13 @@ type FlightInfo struct {
unknownFields protoimpl.UnknownFields
// The schema of the dataset in its IPC form:
- // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
- // 4 bytes - the byte length of the payload
- // a flatbuffer Message whose header is the Schema
- Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3"
json:"schema,omitempty"`
//
+ // 4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
+ // 4 bytes - the byte length of the payload
+ // a flatbuffer Message whose header is the Schema
+ Schema []byte `protobuf:"bytes,1,opt,name=schema,proto3"
json:"schema,omitempty"`
// The descriptor associated with this info.
FlightDescriptor *FlightDescriptor
`protobuf:"bytes,2,opt,name=flight_descriptor,json=flightDescriptor,proto3"
json:"flight_descriptor,omitempty"`
- //
// A list of endpoints associated with the flight. To consume the
// whole flight, all endpoints (and hence all Tickets) must be
// consumed. Endpoints can be consumed in any order.
@@ -673,12 +656,27 @@ type FlightInfo struct {
// In other words, an application can use multiple endpoints to
// represent partitioned data.
//
- // There is no ordering defined on endpoints. Hence, if the returned
- // data has an ordering, it should be returned in a single endpoint.
+ // If the returned data has an ordering, an application can use
+ // "FlightInfo.ordered = true" or should return the all data in a
+ // single endpoint. Otherwise, there is no ordering defined on
+ // endpoints or the data within.
+ //
+ // A client can read ordered data by reading data from returned
+ // endpoints in order from front to back.
+ //
+ // Note that a client may ignore "FlightInfo.ordered = true". If an
+ // ordering is important for an application, an application must
+ // choose one of them:
+ //
+ // - An application requires that all clients must read data in
+ // returned endpoints order.
+ // - An application must return the all data in a single endpoint.
Endpoint []*FlightEndpoint `protobuf:"bytes,3,rep,name=endpoint,proto3"
json:"endpoint,omitempty"`
// Set these to -1 if unknown.
TotalRecords int64
`protobuf:"varint,4,opt,name=total_records,json=totalRecords,proto3"
json:"total_records,omitempty"`
TotalBytes int64
`protobuf:"varint,5,opt,name=total_bytes,json=totalBytes,proto3"
json:"total_bytes,omitempty"`
+ // FlightEndpoints are in the same order as the data.
+ Ordered bool `protobuf:"varint,6,opt,name=ordered,proto3"
json:"ordered,omitempty"`
}
func (x *FlightInfo) Reset() {
@@ -748,17 +746,21 @@ func (x *FlightInfo) GetTotalBytes() int64 {
return 0
}
-//
+func (x *FlightInfo) GetOrdered() bool {
+ if x != nil {
+ return x.Ordered
+ }
+ return false
+}
+
// A particular stream or split associated with a flight.
type FlightEndpoint struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- //
// Token used to retrieve this stream.
Ticket *Ticket `protobuf:"bytes,1,opt,name=ticket,proto3"
json:"ticket,omitempty"`
- //
// A list of URIs where this ticket can be redeemed via DoGet().
//
// If the list is empty, the expectation is that the ticket can only
@@ -822,7 +824,6 @@ func (x *FlightEndpoint) GetLocation() []*Location {
return nil
}
-//
// A location where a Flight service will accept retrieval of a particular
// stream given a ticket.
type Location struct {
@@ -872,7 +873,6 @@ func (x *Location) GetUri() string {
return ""
}
-//
// An opaque identifier that the service can use to retrieve a particular
// portion of a stream.
//
@@ -925,24 +925,19 @@ func (x *Ticket) GetTicket() []byte {
return nil
}
-//
// A batch of Arrow data as part of a stream of batches.
type FlightData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- //
// The descriptor of the data. This is only relevant when a client is
// starting a new DoPut stream.
FlightDescriptor *FlightDescriptor
`protobuf:"bytes,1,opt,name=flight_descriptor,json=flightDescriptor,proto3"
json:"flight_descriptor,omitempty"`
- //
// Header for message data as described in Message.fbs::Message.
DataHeader []byte
`protobuf:"bytes,2,opt,name=data_header,json=dataHeader,proto3"
json:"data_header,omitempty"`
- //
// Application-defined metadata.
AppMetadata []byte
`protobuf:"bytes,3,opt,name=app_metadata,json=appMetadata,proto3"
json:"app_metadata,omitempty"`
- //
// The actual batch of Arrow data. Preferably handled with
minimal-copies
// coming last in the definition to help with sidecar patterns (it is
// expected that some implementations will fetch this field off the wire
@@ -1010,7 +1005,7 @@ func (x *FlightData) GetDataBody() []byte {
return nil
}
-//*
+// *
// The response message associated with the submission of a DoPut.
type PutResult struct {
state protoimpl.MessageState
@@ -1106,7 +1101,7 @@ var file_Flight_proto_rawDesc = []byte{
0x04, 0x70, 0x61, 0x74, 0x68, 0x22, 0x30, 0x0a, 0x0e, 0x44, 0x65, 0x73,
0x63, 0x72, 0x69, 0x70,
0x74, 0x6f, 0x72, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55,
0x4e, 0x4b, 0x4e, 0x4f,
0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x41, 0x54, 0x48,
0x10, 0x01, 0x12, 0x07,
- 0x0a, 0x03, 0x43, 0x4d, 0x44, 0x10, 0x02, 0x22, 0x83, 0x02, 0x0a, 0x0a,
0x46, 0x6c, 0x69, 0x67,
+ 0x0a, 0x03, 0x43, 0x4d, 0x44, 0x10, 0x02, 0x22, 0x9d, 0x02, 0x0a, 0x0a,
0x46, 0x6c, 0x69, 0x67,
0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x63,
0x68, 0x65, 0x6d, 0x61,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65,
0x6d, 0x61, 0x12, 0x54,
0x0a, 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x5f, 0x64, 0x65, 0x73,
0x63, 0x72, 0x69, 0x70,
@@ -1122,94 +1117,95 @@ var file_Flight_proto_rawDesc = []byte{
0x5f, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01,
0x28, 0x03, 0x52, 0x0c,
0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73,
0x12, 0x1f, 0x0a, 0x0b,
0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18,
0x05, 0x20, 0x01, 0x28,
- 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x42, 0x79, 0x74, 0x65,
0x73, 0x22, 0x84, 0x01,
- 0x0a, 0x0e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x45, 0x6e, 0x64, 0x70,
0x6f, 0x69, 0x6e, 0x74,
- 0x12, 0x35, 0x0a, 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x2e,
- 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63,
0x6b, 0x65, 0x74, 0x52,
- 0x06, 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x3b, 0x0a, 0x08, 0x6c,
0x6f, 0x63, 0x61, 0x74,
- 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e,
0x61, 0x72, 0x72, 0x6f,
- 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x63, 0x6f,
- 0x6c, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08,
0x6c, 0x6f, 0x63, 0x61,
- 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1c, 0x0a, 0x08, 0x4c, 0x6f, 0x63, 0x61,
0x74, 0x69, 0x6f, 0x6e,
- 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x69, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x03, 0x75,
- 0x72, 0x69, 0x22, 0x20, 0x0a, 0x06, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74,
0x12, 0x16, 0x0a, 0x06,
- 0x74, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x06, 0x74, 0x69,
- 0x63, 0x6b, 0x65, 0x74, 0x22, 0xc4, 0x01, 0x0a, 0x0a, 0x46, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x44,
- 0x61, 0x74, 0x61, 0x12, 0x54, 0x0a, 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x5f, 0x64, 0x65,
- 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x27,
- 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x2e, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x44, 0x65, 0x73,
- 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x52, 0x10, 0x66, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x44,
- 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x12, 0x1f, 0x0a,
0x0b, 0x64, 0x61, 0x74,
- 0x61, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x0a,
- 0x64, 0x61, 0x74, 0x61, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x21,
0x0a, 0x0c, 0x61, 0x70,
- 0x70, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03,
0x20, 0x01, 0x28, 0x0c,
- 0x52, 0x0b, 0x61, 0x70, 0x70, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
0x61, 0x12, 0x1c, 0x0a,
- 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x62, 0x6f, 0x64, 0x79, 0x18, 0xe8,
0x07, 0x20, 0x01, 0x28,
- 0x0c, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x42, 0x6f, 0x64, 0x79, 0x22,
0x2e, 0x0a, 0x09, 0x50,
- 0x75, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x21, 0x0a, 0x0c,
0x61, 0x70, 0x70, 0x5f,
- 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x0b,
- 0x61, 0x70, 0x70, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x32,
0xa7, 0x06, 0x0a, 0x0d,
- 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x12, 0x64, 0x0a,
- 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x27,
0x2e, 0x61, 0x72, 0x72,
+ 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x42, 0x79, 0x74, 0x65,
0x73, 0x12, 0x18, 0x0a,
+ 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01,
0x28, 0x08, 0x52, 0x07,
+ 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x64, 0x22, 0x84, 0x01, 0x0a, 0x0e,
0x46, 0x6c, 0x69, 0x67,
+ 0x68, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x35,
0x0a, 0x06, 0x74, 0x69,
+ 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d,
0x2e, 0x61, 0x72, 0x72,
0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x63,
- 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65,
0x52, 0x65, 0x71, 0x75,
- 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e,
0x66, 0x6c, 0x69, 0x67,
- 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e,
0x48, 0x61, 0x6e, 0x64,
- 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x00, 0x28,
- 0x01, 0x30, 0x01, 0x12, 0x55, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x46,
0x6c, 0x69, 0x67, 0x68,
- 0x74, 0x73, 0x12, 0x1f, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66,
0x6c, 0x69, 0x67, 0x68,
- 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x43,
0x72, 0x69, 0x74, 0x65,
- 0x72, 0x69, 0x61, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e,
0x66, 0x6c, 0x69, 0x67,
- 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e,
0x46, 0x6c, 0x69, 0x67,
- 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x30, 0x01, 0x12, 0x5d,
0x0a, 0x0d, 0x47, 0x65,
- 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12,
0x27, 0x2e, 0x61, 0x72,
+ 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x52, 0x06, 0x74,
0x69, 0x63, 0x6b, 0x65,
+ 0x74, 0x12, 0x3b, 0x0a, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x18, 0x02, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e,
0x66, 0x6c, 0x69, 0x67,
+ 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e,
0x4c, 0x6f, 0x63, 0x61,
+ 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x22, 0x1c,
+ 0x0a, 0x08, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x10,
0x0a, 0x03, 0x75, 0x72,
+ 0x69, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x69,
0x22, 0x20, 0x0a, 0x06,
+ 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x69,
0x63, 0x6b, 0x65, 0x74,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x69, 0x63, 0x6b,
0x65, 0x74, 0x22, 0xc4,
+ 0x01, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74,
0x61, 0x12, 0x54, 0x0a,
+ 0x11, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x5f, 0x64, 0x65, 0x73, 0x63,
0x72, 0x69, 0x70, 0x74,
+ 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x61,
0x72, 0x72, 0x6f, 0x77,
+ 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x63, 0x6f, 0x6c,
+ 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72,
0x69, 0x70, 0x74, 0x6f,
+ 0x72, 0x52, 0x10, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73,
0x63, 0x72, 0x69, 0x70,
+ 0x74, 0x6f, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f,
0x68, 0x65, 0x61, 0x64,
+ 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x64, 0x61,
0x74, 0x61, 0x48, 0x65,
+ 0x61, 0x64, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, 0x70, 0x5f,
0x6d, 0x65, 0x74, 0x61,
+ 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b,
0x61, 0x70, 0x70, 0x4d,
+ 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x64,
0x61, 0x74, 0x61, 0x5f,
+ 0x62, 0x6f, 0x64, 0x79, 0x18, 0xe8, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x08, 0x64, 0x61, 0x74,
+ 0x61, 0x42, 0x6f, 0x64, 0x79, 0x22, 0x2e, 0x0a, 0x09, 0x50, 0x75, 0x74,
0x52, 0x65, 0x73, 0x75,
+ 0x6c, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, 0x70, 0x5f, 0x6d, 0x65,
0x74, 0x61, 0x64, 0x61,
+ 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x61, 0x70,
0x70, 0x4d, 0x65, 0x74,
+ 0x61, 0x64, 0x61, 0x74, 0x61, 0x32, 0xa7, 0x06, 0x0a, 0x0d, 0x46, 0x6c,
0x69, 0x67, 0x68, 0x74,
+ 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x64, 0x0a, 0x09, 0x48,
0x61, 0x6e, 0x64, 0x73,
+ 0x68, 0x61, 0x6b, 0x65, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77,
0x2e, 0x66, 0x6c, 0x69,
+ 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
0x2e, 0x48, 0x61, 0x6e,
+ 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x28, 0x2e,
+ 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74,
0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68,
0x61, 0x6b, 0x65, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30,
0x01, 0x12, 0x55, 0x0a,
+ 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x73,
0x12, 0x1f, 0x2e, 0x61,
+ 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e,
0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69,
0x61, 0x1a, 0x21, 0x2e,
+ 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74,
0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74,
0x49, 0x6e, 0x66, 0x6f,
+ 0x22, 0x00, 0x30, 0x01, 0x12, 0x5d, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x46,
0x6c, 0x69, 0x67, 0x68,
+ 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f,
0x77, 0x2e, 0x66, 0x6c,
+ 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
0x6c, 0x2e, 0x46, 0x6c,
+ 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
0x6f, 0x72, 0x1a, 0x21,
+ 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x49, 0x6e, 0x66,
+ 0x6f, 0x22, 0x00, 0x12, 0x5b, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x63,
0x68, 0x65, 0x6d, 0x61,
+ 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x2e,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x44,
+ 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x1a, 0x23, 0x2e,
0x61, 0x72, 0x72, 0x6f,
+ 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x63, 0x6f,
+ 0x6c, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x75,
0x6c, 0x74, 0x22, 0x00,
+ 0x12, 0x4d, 0x0a, 0x05, 0x44, 0x6f, 0x47, 0x65, 0x74, 0x12, 0x1d, 0x2e,
0x61, 0x72, 0x72, 0x6f,
+ 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x63, 0x6f,
+ 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x21, 0x2e, 0x61,
0x72, 0x72, 0x6f, 0x77,
+ 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x63, 0x6f, 0x6c,
+ 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x22,
0x00, 0x30, 0x01, 0x12,
+ 0x52, 0x0a, 0x05, 0x44, 0x6f, 0x50, 0x75, 0x74, 0x12, 0x21, 0x2e, 0x61,
0x72, 0x72, 0x6f, 0x77,
+ 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x63, 0x6f, 0x6c,
+ 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a,
0x20, 0x2e, 0x61, 0x72,
0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f,
- 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65,
0x73, 0x63, 0x72, 0x69,
- 0x70, 0x74, 0x6f, 0x72, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77,
0x2e, 0x66, 0x6c, 0x69,
+ 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c,
0x74, 0x22, 0x00, 0x28,
+ 0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x0a, 0x44, 0x6f, 0x45, 0x78, 0x63,
0x68, 0x61, 0x6e, 0x67,
+ 0x65, 0x12, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c,
0x69, 0x67, 0x68, 0x74,
+ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c,
0x69, 0x67, 0x68, 0x74,
+ 0x44, 0x61, 0x74, 0x61, 0x1a, 0x21, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77,
0x2e, 0x66, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
0x2e, 0x46, 0x6c, 0x69,
- 0x67, 0x68, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x5b, 0x0a,
0x09, 0x47, 0x65, 0x74,
- 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x27, 0x2e, 0x61, 0x72, 0x72,
0x6f, 0x77, 0x2e, 0x66,
- 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
0x6f, 0x6c, 0x2e, 0x46,
- 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70,
0x74, 0x6f, 0x72, 0x1a,
- 0x23, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67,
0x68, 0x74, 0x2e, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x53, 0x63, 0x68, 0x65,
0x6d, 0x61, 0x52, 0x65,
- 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x05, 0x44, 0x6f,
0x47, 0x65, 0x74, 0x12,
- 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67,
0x68, 0x74, 0x2e, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x54, 0x69, 0x63, 0x6b,
0x65, 0x74, 0x1a, 0x21,
- 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x2e, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x44, 0x61, 0x74,
- 0x61, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x05, 0x44, 0x6f, 0x50,
0x75, 0x74, 0x12, 0x21,
- 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x2e, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x44, 0x61, 0x74,
- 0x61, 0x1a, 0x20, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c,
0x69, 0x67, 0x68, 0x74,
- 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x50, 0x75,
0x74, 0x52, 0x65, 0x73,
- 0x75, 0x6c, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x58, 0x0a,
0x0a, 0x44, 0x6f, 0x45,
- 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x21, 0x2e, 0x61, 0x72,
0x72, 0x6f, 0x77, 0x2e,
- 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x63, 0x6f, 0x6c, 0x2e,
- 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x21,
0x2e, 0x61, 0x72, 0x72,
+ 0x67, 0x68, 0x74, 0x44, 0x61, 0x74, 0x61, 0x22, 0x00, 0x28, 0x01, 0x30,
0x01, 0x12, 0x4c, 0x0a,
+ 0x08, 0x44, 0x6f, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1d, 0x2e,
0x61, 0x72, 0x72, 0x6f,
+ 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x63, 0x6f,
+ 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x1d, 0x2e, 0x61,
0x72, 0x72, 0x6f, 0x77,
+ 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x63, 0x6f, 0x6c,
+ 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12,
0x52, 0x0a, 0x0b, 0x4c,
+ 0x69, 0x73, 0x74, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1c,
0x2e, 0x61, 0x72, 0x72,
0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x63,
- 0x6f, 0x6c, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x44, 0x61, 0x74,
0x61, 0x22, 0x00, 0x28,
- 0x01, 0x30, 0x01, 0x12, 0x4c, 0x0a, 0x08, 0x44, 0x6f, 0x41, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x12,
- 0x1d, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67,
0x68, 0x74, 0x2e, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x1a, 0x1d,
- 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x2e, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c,
0x74, 0x22, 0x00, 0x30,
- 0x01, 0x12, 0x52, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x41, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x73,
- 0x12, 0x1c, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x2e,
- 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x1a, 0x21,
- 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x2e, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x54, 0x79, 0x70,
- 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x76, 0x0a, 0x1c, 0x6f, 0x72, 0x67,
0x2e, 0x61, 0x70, 0x61,
- 0x63, 0x68, 0x65, 0x2e, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c,
0x69, 0x67, 0x68, 0x74,
- 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f,
- 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x61, 0x72, 0x72,
0x6f, 0x77, 0x2f, 0x67,
- 0x6f, 0x2f, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x66, 0x6c, 0x69, 0x67,
0x68, 0x74, 0x2f, 0x69,
- 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x66, 0x6c, 0x69, 0x67,
0x68, 0x74, 0xaa, 0x02,
- 0x1c, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x41, 0x72, 0x72, 0x6f,
0x77, 0x2e, 0x46, 0x6c,
- 0x69, 0x67, 0x68, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
0x6c, 0x62, 0x06, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x6f, 0x6c, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x61,
0x72, 0x72, 0x6f, 0x77,
+ 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x63, 0x6f, 0x6c,
+ 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x22,
0x00, 0x30, 0x01, 0x42,
+ 0x76, 0x0a, 0x1c, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
0x65, 0x2e, 0x61, 0x72,
+ 0x72, 0x6f, 0x77, 0x2e, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x69,
0x6d, 0x70, 0x6c, 0x5a,
+ 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x61, 0x70, 0x61, 0x63,
+ 0x68, 0x65, 0x2f, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x2f, 0x67, 0x6f, 0x2f,
0x61, 0x72, 0x72, 0x6f,
+ 0x77, 0x2f, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2f, 0x69, 0x6e, 0x74,
0x65, 0x72, 0x6e, 0x61,
+ 0x6c, 0x2f, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0xaa, 0x02, 0x1c, 0x41,
0x70, 0x61, 0x63, 0x68,
+ 0x65, 0x2e, 0x41, 0x72, 0x72, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x69, 0x67,
0x68, 0x74, 0x2e, 0x50,
+ 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
}
var (
diff --git a/go/arrow/flight/internal/flight/Flight_grpc.pb.go
b/go/arrow/flight/internal/flight/Flight_grpc.pb.go
index 9613114448..10fd285a5c 100644
--- a/go/arrow/flight/internal/flight/Flight_grpc.pb.go
+++ b/go/arrow/flight/internal/flight/Flight_grpc.pb.go
@@ -1,8 +1,4 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
-// versions:
-// - protoc-gen-go-grpc v1.2.0
-// - protoc v3.12.4
-// source: Flight.proto
package flight
@@ -15,20 +11,17 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// FlightServiceClient is the client API for FlightService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please
refer to
https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type FlightServiceClient interface {
- //
// Handshake between client and server. Depending on the server, the
// handshake may be required to determine the token that should be used
for
// future operations. Both request and response are streams to allow
multiple
// round-trips depending on auth mechanism.
Handshake(ctx context.Context, opts ...grpc.CallOption)
(FlightService_HandshakeClient, error)
- //
// Get a list of available streams given a particular criteria. Most
flight
// services will expose one or more streams that are readily available
for
// retrieval. This api allows listing the streams available for
@@ -36,7 +29,6 @@ type FlightServiceClient interface {
// the subset of streams that can be listed via this interface. Each
flight
// service allows its own definition of how to consume criteria.
ListFlights(ctx context.Context, in *Criteria, opts ...grpc.CallOption)
(FlightService_ListFlightsClient, error)
- //
// For a given FlightDescriptor, get information about how the flight
can be
// consumed. This is a useful interface if the consumer of the interface
// already can identify the specific flight to consume. This interface
can
@@ -48,19 +40,16 @@ type FlightServiceClient interface {
// available for consumption for the duration defined by the specific
flight
// service.
GetFlightInfo(ctx context.Context, in *FlightDescriptor, opts
...grpc.CallOption) (*FlightInfo, error)
- //
// For a given FlightDescriptor, get the Schema as described in
Schema.fbs::Schema
// This is used when a consumer needs the Schema of flight stream.
Similar to
// GetFlightInfo this interface may generate a new flight that was not
previously
// available in ListFlights.
GetSchema(ctx context.Context, in *FlightDescriptor, opts
...grpc.CallOption) (*SchemaResult, error)
- //
// Retrieve a single stream associated with a particular descriptor
// associated with the referenced ticket. A Flight can be composed of
one or
// more streams where each stream can be retrieved using a separate
opaque
// ticket that the flight service uses for managing a collection of
streams.
DoGet(ctx context.Context, in *Ticket, opts ...grpc.CallOption)
(FlightService_DoGetClient, error)
- //
// Push a stream to the flight service associated with a particular
// flight stream. This allows a client of a flight service to upload a
stream
// of data. Depending on the particular flight service, a client
consumer
@@ -68,14 +57,12 @@ type FlightServiceClient interface {
// number. In the latter, the service might implement a 'seal' action
that
// can be applied to a descriptor once all streams are uploaded.
DoPut(ctx context.Context, opts ...grpc.CallOption)
(FlightService_DoPutClient, error)
- //
// Open a bidirectional data channel for a given descriptor. This
// allows clients to send and receive arbitrary Arrow data and
// application-specific metadata in a single logical stream. In
// contrast to DoGet/DoPut, this is more suited for clients
// offloading computation (rather than storage) to a Flight service.
DoExchange(ctx context.Context, opts ...grpc.CallOption)
(FlightService_DoExchangeClient, error)
- //
// Flight services can support an arbitrary number of simple actions in
// addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
// operations that are potentially available. DoAction allows a flight
client
@@ -83,7 +70,6 @@ type FlightServiceClient interface {
// opaque request and response objects that are specific to the type
action
// being undertaken.
DoAction(ctx context.Context, in *Action, opts ...grpc.CallOption)
(FlightService_DoActionClient, error)
- //
// A flight service exposes all of the available action types that it
has
// along with descriptions. This allows different flight consumers to
// understand the capabilities of the flight service.
@@ -99,7 +85,7 @@ func NewFlightServiceClient(cc grpc.ClientConnInterface)
FlightServiceClient {
}
func (c *flightServiceClient) Handshake(ctx context.Context, opts
...grpc.CallOption) (FlightService_HandshakeClient, error) {
- stream, err := c.cc.NewStream(ctx,
&FlightService_ServiceDesc.Streams[0],
"/arrow.flight.protocol.FlightService/Handshake", opts...)
+ stream, err := c.cc.NewStream(ctx,
&_FlightService_serviceDesc.Streams[0],
"/arrow.flight.protocol.FlightService/Handshake", opts...)
if err != nil {
return nil, err
}
@@ -130,7 +116,7 @@ func (x *flightServiceHandshakeClient) Recv()
(*HandshakeResponse, error) {
}
func (c *flightServiceClient) ListFlights(ctx context.Context, in *Criteria,
opts ...grpc.CallOption) (FlightService_ListFlightsClient, error) {
- stream, err := c.cc.NewStream(ctx,
&FlightService_ServiceDesc.Streams[1],
"/arrow.flight.protocol.FlightService/ListFlights", opts...)
+ stream, err := c.cc.NewStream(ctx,
&_FlightService_serviceDesc.Streams[1],
"/arrow.flight.protocol.FlightService/ListFlights", opts...)
if err != nil {
return nil, err
}
@@ -180,7 +166,7 @@ func (c *flightServiceClient) GetSchema(ctx
context.Context, in *FlightDescripto
}
func (c *flightServiceClient) DoGet(ctx context.Context, in *Ticket, opts
...grpc.CallOption) (FlightService_DoGetClient, error) {
- stream, err := c.cc.NewStream(ctx,
&FlightService_ServiceDesc.Streams[2],
"/arrow.flight.protocol.FlightService/DoGet", opts...)
+ stream, err := c.cc.NewStream(ctx,
&_FlightService_serviceDesc.Streams[2],
"/arrow.flight.protocol.FlightService/DoGet", opts...)
if err != nil {
return nil, err
}
@@ -212,7 +198,7 @@ func (x *flightServiceDoGetClient) Recv() (*FlightData,
error) {
}
func (c *flightServiceClient) DoPut(ctx context.Context, opts
...grpc.CallOption) (FlightService_DoPutClient, error) {
- stream, err := c.cc.NewStream(ctx,
&FlightService_ServiceDesc.Streams[3],
"/arrow.flight.protocol.FlightService/DoPut", opts...)
+ stream, err := c.cc.NewStream(ctx,
&_FlightService_serviceDesc.Streams[3],
"/arrow.flight.protocol.FlightService/DoPut", opts...)
if err != nil {
return nil, err
}
@@ -243,7 +229,7 @@ func (x *flightServiceDoPutClient) Recv() (*PutResult,
error) {
}
func (c *flightServiceClient) DoExchange(ctx context.Context, opts
...grpc.CallOption) (FlightService_DoExchangeClient, error) {
- stream, err := c.cc.NewStream(ctx,
&FlightService_ServiceDesc.Streams[4],
"/arrow.flight.protocol.FlightService/DoExchange", opts...)
+ stream, err := c.cc.NewStream(ctx,
&_FlightService_serviceDesc.Streams[4],
"/arrow.flight.protocol.FlightService/DoExchange", opts...)
if err != nil {
return nil, err
}
@@ -274,7 +260,7 @@ func (x *flightServiceDoExchangeClient) Recv()
(*FlightData, error) {
}
func (c *flightServiceClient) DoAction(ctx context.Context, in *Action, opts
...grpc.CallOption) (FlightService_DoActionClient, error) {
- stream, err := c.cc.NewStream(ctx,
&FlightService_ServiceDesc.Streams[5],
"/arrow.flight.protocol.FlightService/DoAction", opts...)
+ stream, err := c.cc.NewStream(ctx,
&_FlightService_serviceDesc.Streams[5],
"/arrow.flight.protocol.FlightService/DoAction", opts...)
if err != nil {
return nil, err
}
@@ -306,7 +292,7 @@ func (x *flightServiceDoActionClient) Recv() (*Result,
error) {
}
func (c *flightServiceClient) ListActions(ctx context.Context, in *Empty, opts
...grpc.CallOption) (FlightService_ListActionsClient, error) {
- stream, err := c.cc.NewStream(ctx,
&FlightService_ServiceDesc.Streams[6],
"/arrow.flight.protocol.FlightService/ListActions", opts...)
+ stream, err := c.cc.NewStream(ctx,
&_FlightService_serviceDesc.Streams[6],
"/arrow.flight.protocol.FlightService/ListActions", opts...)
if err != nil {
return nil, err
}
@@ -341,13 +327,11 @@ func (x *flightServiceListActionsClient) Recv()
(*ActionType, error) {
// All implementations must embed UnimplementedFlightServiceServer
// for forward compatibility
type FlightServiceServer interface {
- //
// Handshake between client and server. Depending on the server, the
// handshake may be required to determine the token that should be used
for
// future operations. Both request and response are streams to allow
multiple
// round-trips depending on auth mechanism.
Handshake(FlightService_HandshakeServer) error
- //
// Get a list of available streams given a particular criteria. Most
flight
// services will expose one or more streams that are readily available
for
// retrieval. This api allows listing the streams available for
@@ -355,7 +339,6 @@ type FlightServiceServer interface {
// the subset of streams that can be listed via this interface. Each
flight
// service allows its own definition of how to consume criteria.
ListFlights(*Criteria, FlightService_ListFlightsServer) error
- //
// For a given FlightDescriptor, get information about how the flight
can be
// consumed. This is a useful interface if the consumer of the interface
// already can identify the specific flight to consume. This interface
can
@@ -367,19 +350,16 @@ type FlightServiceServer interface {
// available for consumption for the duration defined by the specific
flight
// service.
GetFlightInfo(context.Context, *FlightDescriptor) (*FlightInfo, error)
- //
// For a given FlightDescriptor, get the Schema as described in
Schema.fbs::Schema
// This is used when a consumer needs the Schema of flight stream.
Similar to
// GetFlightInfo this interface may generate a new flight that was not
previously
// available in ListFlights.
GetSchema(context.Context, *FlightDescriptor) (*SchemaResult, error)
- //
// Retrieve a single stream associated with a particular descriptor
// associated with the referenced ticket. A Flight can be composed of
one or
// more streams where each stream can be retrieved using a separate
opaque
// ticket that the flight service uses for managing a collection of
streams.
DoGet(*Ticket, FlightService_DoGetServer) error
- //
// Push a stream to the flight service associated with a particular
// flight stream. This allows a client of a flight service to upload a
stream
// of data. Depending on the particular flight service, a client
consumer
@@ -387,14 +367,12 @@ type FlightServiceServer interface {
// number. In the latter, the service might implement a 'seal' action
that
// can be applied to a descriptor once all streams are uploaded.
DoPut(FlightService_DoPutServer) error
- //
// Open a bidirectional data channel for a given descriptor. This
// allows clients to send and receive arbitrary Arrow data and
// application-specific metadata in a single logical stream. In
// contrast to DoGet/DoPut, this is more suited for clients
// offloading computation (rather than storage) to a Flight service.
DoExchange(FlightService_DoExchangeServer) error
- //
// Flight services can support an arbitrary number of simple actions in
// addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut
// operations that are potentially available. DoAction allows a flight
client
@@ -402,7 +380,6 @@ type FlightServiceServer interface {
// opaque request and response objects that are specific to the type
action
// being undertaken.
DoAction(*Action, FlightService_DoActionServer) error
- //
// A flight service exposes all of the available action types that it
has
// along with descriptions. This allows different flight consumers to
// understand the capabilities of the flight service.
@@ -450,8 +427,8 @@ type UnsafeFlightServiceServer interface {
mustEmbedUnimplementedFlightServiceServer()
}
-func RegisterFlightServiceServer(s grpc.ServiceRegistrar, srv
FlightServiceServer) {
- s.RegisterService(&FlightService_ServiceDesc, srv)
+func RegisterFlightServiceServer(s *grpc.Server, srv FlightServiceServer) {
+ s.RegisterService(&_FlightService_serviceDesc, srv)
}
func _FlightService_Handshake_Handler(srv interface{}, stream
grpc.ServerStream) error {
@@ -652,10 +629,7 @@ func (x *flightServiceListActionsServer) Send(m
*ActionType) error {
return x.ServerStream.SendMsg(m)
}
-// FlightService_ServiceDesc is the grpc.ServiceDesc for FlightService service.
-// It's only intended for direct use with grpc.RegisterService,
-// and not to be introspected or modified (even as a copy)
-var FlightService_ServiceDesc = grpc.ServiceDesc{
+var _FlightService_serviceDesc = grpc.ServiceDesc{
ServiceName: "arrow.flight.protocol.FlightService",
HandlerType: (*FlightServiceServer)(nil),
Methods: []grpc.MethodDesc{
diff --git a/go/arrow/flight/server.go b/go/arrow/flight/server.go
index 9be0e47029..d8e9e5ff8e 100644
--- a/go/arrow/flight/server.go
+++ b/go/arrow/flight/server.go
@@ -52,14 +52,9 @@ type (
Empty = flight.Empty
)
-// FlightService_ServiceDesc is the grpc.ServiceDesc for the FlightService
-// server. It should only be used for direct call of grpc.RegisterService,
-// and not introspected or modified (even as a copy).
-var FlightService_ServiceDesc = flight.FlightService_ServiceDesc
-
// RegisterFlightServiceServer registers an existing flight server onto an
// existing grpc server, or anything that is a grpc service registrar.
-func RegisterFlightServiceServer(s grpc.ServiceRegistrar, srv FlightServer) {
+func RegisterFlightServiceServer(s *grpc.Server, srv FlightServer) {
flight.RegisterFlightServiceServer(s, srv)
}
diff --git a/go/arrow/internal/flight_integration/scenario.go
b/go/arrow/internal/flight_integration/scenario.go
index 55e2b8a994..c26bc7843d 100644
--- a/go/arrow/internal/flight_integration/scenario.go
+++ b/go/arrow/internal/flight_integration/scenario.go
@@ -55,6 +55,8 @@ func GetScenario(name string, args ...string) Scenario {
return &authBasicProtoTester{}
case "middleware":
return &middlewareScenarioTester{}
+ case "ordered":
+ return &orderedScenarioTester{}
case "flight_sql":
return &flightSqlScenarioTester{}
case "flight_sql:extension":
@@ -526,6 +528,168 @@ func (m *middlewareScenarioTester) GetFlightInfo(ctx
context.Context, desc *flig
}, nil
}
+type orderedScenarioTester struct {
+ flight.BaseFlightServer
+}
+
+func (m *orderedScenarioTester) RunClient(addr string, opts
...grpc.DialOption) error {
+ client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...)
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ ctx := context.Background()
+ info, err := client.GetFlightInfo(ctx, &flight.FlightDescriptor{Type:
flight.DescriptorCMD, Cmd: []byte("ordered")})
+ if err != nil {
+ return err
+ }
+
+ if !info.GetOrdered() {
+ return fmt.Errorf("expected to server return FlightInfo.ordered
= true")
+ }
+
+ recs := make([]arrow.Record, len(info.Endpoint))
+ for i, ep := range info.Endpoint {
+ if len(ep.Location) != 0 {
+ return fmt.Errorf("expected to receive empty locations
to use the original service: %s",
+ ep.Location)
+ }
+
+ stream, err := client.DoGet(ctx, ep.Ticket)
+ if err != nil {
+ return err
+ }
+
+ rdr, err := flight.NewRecordReader(stream)
+ if err != nil {
+ return err
+ }
+ defer rdr.Release()
+
+ for rdr.Next() {
+ record := rdr.Record()
+ record.Retain()
+ defer record.Release()
+ recs[i] = record
+ }
+ if rdr.Err() != nil {
+ return rdr.Err()
+ }
+ }
+
+ // Build expected records
+ mem := memory.DefaultAllocator
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "number", Type: arrow.PrimitiveTypes.Int32},
+ },
+ nil,
+ )
+ expected_table, _ := array.TableFromJSON(mem, schema, []string{
+ `[
+ {"number": 1},
+ {"number": 2},
+ {"number": 3}
+ ]`,
+ `[
+ {"number": 10},
+ {"number": 20},
+ {"number": 30}
+ ]`,
+ `[
+ {"number": 100},
+ {"number": 200},
+ {"number": 300}
+ ]`,
+ })
+ defer expected_table.Release()
+
+ table := array.NewTableFromRecords(schema, recs)
+ defer table.Release()
+ if !array.TableEqual(table, expected_table) {
+ return fmt.Errorf("read data isn't expected\n" +
+ "Expected:\n" +
+ "%s\n" +
+ "num-rows: %d\n" +
+ "num-cols: %d\n" +
+ "Actual:\n" +
+ "%s\n" +
+ "num-rows: %d\n" +
+ "num-cols: %d",
+ expected_table.Schema(),
+ expected_table.NumRows(),
+ expected_table.NumCols(),
+ table.Schema(),
+ table.NumRows(),
+ table.NumCols())
+ }
+
+ return nil
+}
+
+func (m *orderedScenarioTester) MakeServer(port int) flight.Server {
+ srv := flight.NewServerWithMiddleware(nil)
+ srv.RegisterFlightService(m)
+ initServer(port, srv)
+ return srv
+}
+
+func (m *orderedScenarioTester) GetFlightInfo(ctx context.Context, desc
*flight.FlightDescriptor) (*flight.FlightInfo, error) {
+ ordered := desc.Type == flight.DescriptorCMD && string(desc.Cmd) ==
"ordered"
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "number", Type: arrow.PrimitiveTypes.Int32},
+ },
+ nil,
+ )
+ return &flight.FlightInfo{
+ Schema: flight.SerializeSchema(schema,
memory.DefaultAllocator),
+ FlightDescriptor: desc,
+ Endpoint: []*flight.FlightEndpoint{
+ {
+ Ticket: &flight.Ticket{Ticket: []byte("1")},
+ Location: []*flight.Location{},
+ },
+ {
+ Ticket: &flight.Ticket{Ticket: []byte("2")},
+ Location: []*flight.Location{},
+ },
+ {
+ Ticket: &flight.Ticket{Ticket: []byte("3")},
+ Location: []*flight.Location{},
+ },
+ },
+ TotalRecords: -1,
+ TotalBytes: -1,
+ Ordered: ordered,
+ }, nil
+}
+
+func (m *orderedScenarioTester) DoGet(tkt *flight.Ticket, fs
flight.FlightService_DoGetServer) error {
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "number", Type: arrow.PrimitiveTypes.Int32},
+ },
+ nil,
+ )
+ b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+ defer b.Release()
+ if string(tkt.GetTicket()) == "1" {
+ b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3},
nil)
+ } else if string(tkt.GetTicket()) == "2" {
+ b.Field(0).(*array.Int32Builder).AppendValues([]int32{10, 20,
30}, nil)
+ } else if string(tkt.GetTicket()) == "3" {
+ b.Field(0).(*array.Int32Builder).AppendValues([]int32{100, 200,
300}, nil)
+ }
+ w := flight.NewRecordWriter(fs, ipc.WithSchema(schema))
+ rec := b.NewRecord()
+ defer rec.Release()
+ w.Write(rec)
+
+ return nil
+}
+
const (
updateStatementExpectedRows int64 = 10000
updateStatementWithTransactionExpectedRows int64 = 15000
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
index e57b311c2e..888c7293ea 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
@@ -48,6 +48,7 @@ public class FlightInfo {
private final List<FlightEndpoint> endpoints;
private final long bytes;
private final long records;
+ private final boolean ordered;
private final IpcOption option;
/**
@@ -61,7 +62,7 @@ public class FlightInfo {
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor,
List<FlightEndpoint> endpoints, long bytes,
long records) {
- this(schema, descriptor, endpoints, bytes, records, IpcOption.DEFAULT);
+ this(schema, descriptor, endpoints, bytes, records, /*ordered*/ false,
IpcOption.DEFAULT);
}
/**
@@ -76,6 +77,22 @@ public class FlightInfo {
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor,
List<FlightEndpoint> endpoints, long bytes,
long records, IpcOption option) {
+ this(schema, descriptor, endpoints, bytes, records, /*ordered*/ false,
option);
+ }
+
+ /**
+ * Constructs a new instance.
+ *
+ * @param schema The schema of the Flight
+ * @param descriptor An identifier for the Flight.
+ * @param endpoints A list of endpoints that have the flight available.
+ * @param bytes The number of bytes in the flight
+ * @param records The number of records in the flight.
+ * @param ordered Whether the endpoints in this flight are ordered.
+ * @param option IPC write options.
+ */
+ public FlightInfo(Schema schema, FlightDescriptor descriptor,
List<FlightEndpoint> endpoints, long bytes,
+ long records, boolean ordered, IpcOption option) {
Objects.requireNonNull(schema);
Objects.requireNonNull(descriptor);
Objects.requireNonNull(endpoints);
@@ -85,6 +102,7 @@ public class FlightInfo {
this.endpoints = endpoints;
this.bytes = bytes;
this.records = records;
+ this.ordered = ordered;
this.option = option;
}
@@ -108,6 +126,7 @@ public class FlightInfo {
}
bytes = pbFlightInfo.getTotalBytes();
records = pbFlightInfo.getTotalRecords();
+ ordered = pbFlightInfo.getOrdered();
option = IpcOption.DEFAULT;
}
@@ -131,6 +150,10 @@ public class FlightInfo {
return endpoints;
}
+ public boolean getOrdered() {
+ return ordered;
+ }
+
/**
* Converts to the protocol buffer representation.
*/
@@ -148,6 +171,7 @@ public class FlightInfo {
.setFlightDescriptor(descriptor.toProtocol())
.setTotalBytes(FlightInfo.this.bytes)
.setTotalRecords(records)
+ .setOrdered(ordered)
.build();
}
@@ -187,12 +211,13 @@ public class FlightInfo {
records == that.records &&
schema.equals(that.schema) &&
descriptor.equals(that.descriptor) &&
- endpoints.equals(that.endpoints);
+ endpoints.equals(that.endpoints) &&
+ ordered == that.ordered;
}
@Override
public int hashCode() {
- return Objects.hash(schema, descriptor, endpoints, bytes, records);
+ return Objects.hash(schema, descriptor, endpoints, bytes, records,
ordered);
}
@Override
@@ -203,6 +228,7 @@ public class FlightInfo {
", endpoints=" + endpoints +
", bytes=" + bytes +
", records=" + records +
+ ", ordered=" + ordered +
'}';
}
}
diff --git
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index f9caeca22e..40337b2de5 100644
---
a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++
b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -119,10 +119,25 @@ public class TestBasicOperation {
new Ticket(new byte[10]),
Location.forGrpcDomainSocket("/tmp/test.sock"),
forGrpcInsecure("localhost", 50051))
), 200, 500);
+ final FlightInfo info4 = new FlightInfo(schema, FlightDescriptor.path("a",
"b"),
+ Arrays.asList(new FlightEndpoint(
+ new Ticket(new byte[10]),
Location.forGrpcDomainSocket("/tmp/test.sock")),
+ new FlightEndpoint(
+ new Ticket(new byte[10]),
Location.forGrpcDomainSocket("/tmp/test.sock"),
+ forGrpcInsecure("localhost", 50051))
+ ), 200, 500, /*ordered*/ true, IpcOption.DEFAULT);
Assertions.assertEquals(info1, FlightInfo.deserialize(info1.serialize()));
Assertions.assertEquals(info2, FlightInfo.deserialize(info2.serialize()));
Assertions.assertEquals(info3, FlightInfo.deserialize(info3.serialize()));
+ Assertions.assertEquals(info4, FlightInfo.deserialize(info4.serialize()));
+
+ Assertions.assertNotEquals(info3, info4);
+
+ Assertions.assertFalse(info1.getOrdered());
+ Assertions.assertFalse(info2.getOrdered());
+ Assertions.assertFalse(info3.getOrdered());
+ Assertions.assertTrue(info4.getOrdered());
}
@Test
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/OrderedScenario.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/OrderedScenario.java
new file mode 100644
index 0000000000..b8aa46fb56
--- /dev/null
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/OrderedScenario.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.NoOpFlightProducer;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.IpcOption;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** Test the 'ordered' flag in FlightInfo. */
+public class OrderedScenario implements Scenario {
+ private static final Schema SCHEMA =
+ new Schema(
+ Collections.singletonList(Field.notNullable("number",
Types.MinorType.INT.getType())));
+ private static final byte[] ORDERED_COMMAND =
"ordered".getBytes(StandardCharsets.UTF_8);
+
+ @Override
+ public FlightProducer producer(BufferAllocator allocator, Location location)
throws Exception {
+ return new OrderedProducer(allocator);
+ }
+
+ @Override
+ public void buildServer(FlightServer.Builder builder) throws Exception {}
+
+ @Override
+ public void client(BufferAllocator allocator, Location location,
FlightClient client)
+ throws Exception {
+ final FlightInfo info =
client.getInfo(FlightDescriptor.command(ORDERED_COMMAND));
+ IntegrationAssertions.assertTrue("ordered must be true",
info.getOrdered());
+ IntegrationAssertions.assertEquals(3, info.getEndpoints().size());
+
+ int offset = 0;
+ for (int multiplier : Arrays.asList(1, 10, 100)) {
+ FlightEndpoint endpoint = info.getEndpoints().get(offset);
+
+ IntegrationAssertions.assertTrue(
+ "locations must be empty", endpoint.getLocations().isEmpty());
+
+ try (final FlightStream stream = client.getStream(endpoint.getTicket()))
{
+ IntegrationAssertions.assertEquals(SCHEMA, stream.getSchema());
+ IntegrationAssertions.assertTrue("stream must have a batch",
stream.next());
+
+ IntVector number = (IntVector) stream.getRoot().getVector(0);
+ IntegrationAssertions.assertEquals(3, stream.getRoot().getRowCount());
+
+ IntegrationAssertions.assertFalse("value must be non-null",
number.isNull(0));
+ IntegrationAssertions.assertFalse("value must be non-null",
number.isNull(1));
+ IntegrationAssertions.assertFalse("value must be non-null",
number.isNull(2));
+ IntegrationAssertions.assertEquals(multiplier, number.get(0));
+ IntegrationAssertions.assertEquals(2 * multiplier, number.get(1));
+ IntegrationAssertions.assertEquals(3 * multiplier, number.get(2));
+
+ IntegrationAssertions.assertFalse("stream must have one batch",
stream.next());
+ }
+
+ offset++;
+ }
+ }
+
+ private static class OrderedProducer extends NoOpFlightProducer {
+ private static final byte[] TICKET_1 =
"1".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] TICKET_2 =
"2".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] TICKET_3 =
"3".getBytes(StandardCharsets.UTF_8);
+
+ private final BufferAllocator allocator;
+
+ OrderedProducer(BufferAllocator allocator) {
+ this.allocator = Objects.requireNonNull(allocator);
+ }
+
+ @Override
+ public void getStream(CallContext context, Ticket ticket,
ServerStreamListener listener) {
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA,
allocator)) {
+ IntVector number = (IntVector) root.getVector(0);
+
+ if (Arrays.equals(ticket.getBytes(), TICKET_1)) {
+ number.setSafe(0, 1);
+ number.setSafe(1, 2);
+ number.setSafe(2, 3);
+ } else if (Arrays.equals(ticket.getBytes(), TICKET_2)) {
+ number.setSafe(0, 10);
+ number.setSafe(1, 20);
+ number.setSafe(2, 30);
+ } else if (Arrays.equals(ticket.getBytes(), TICKET_3)) {
+ number.setSafe(0, 100);
+ number.setSafe(1, 200);
+ number.setSafe(2, 300);
+ } else {
+ listener.error(
+ CallStatus.INVALID_ARGUMENT
+ .withDescription(
+ "Could not find flight: " + new
String(ticket.getBytes(), StandardCharsets.UTF_8))
+ .toRuntimeException());
+ return;
+ }
+
+ root.setRowCount(3);
+
+ listener.start(root);
+ listener.putNext();
+ listener.completed();
+ }
+ }
+
+ @Override
+ public FlightInfo getFlightInfo(CallContext context, FlightDescriptor
descriptor) {
+ final boolean ordered =
+ descriptor.isCommand() && Arrays.equals(descriptor.getCommand(),
ORDERED_COMMAND);
+ List<FlightEndpoint> endpoints;
+ if (ordered) {
+ endpoints =
+ Arrays.asList(
+ new FlightEndpoint(new Ticket(TICKET_1)),
+ new FlightEndpoint(new Ticket(TICKET_2)),
+ new FlightEndpoint(new Ticket(TICKET_3)));
+ } else {
+ endpoints =
+ Arrays.asList(
+ new FlightEndpoint(new Ticket(TICKET_1)),
+ new FlightEndpoint(new Ticket(TICKET_3)),
+ new FlightEndpoint(new Ticket(TICKET_2)));
+ }
+ return new FlightInfo(
+ SCHEMA, descriptor, endpoints, /*bytes*/ -1, /*records*/ -1,
ordered, IpcOption.DEFAULT);
+ }
+ }
+}
diff --git
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
index 77f7ab0006..c2e10fcf47 100644
---
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
+++
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java
@@ -41,6 +41,7 @@ final class Scenarios {
scenarios = new TreeMap<>();
scenarios.put("auth:basic_proto", AuthBasicProtoScenario::new);
scenarios.put("middleware", MiddlewareScenario::new);
+ scenarios.put("ordered", OrderedScenario::new);
scenarios.put("flight_sql", FlightSqlScenario::new);
scenarios.put("flight_sql:extension", FlightSqlExtensionScenario::new);
}
diff --git
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
index 0751e1d7a8..4507dfb129 100644
---
a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
+++
b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
@@ -38,6 +38,11 @@ class IntegrationTest {
testScenario("middleware");
}
+ @Test
+ void ordered() throws Exception {
+ testScenario("ordered");
+ }
+
@Test
void flightSql() throws Exception {
testScenario("flight_sql");