lidavidm commented on code in PR #47991:
URL: https://github.com/apache/arrow/pull/47991#discussion_r2479893517
##########
cpp/src/arrow/flight/sql/odbc/odbc_impl/flight_sql_stream_chunk_buffer.cc:
##########
@@ -20,37 +20,67 @@
namespace arrow::flight::sql::odbc {
-using arrow::Result;
-
FlightStreamChunkBuffer::FlightStreamChunkBuffer(
- FlightSqlClient& flight_sql_client, const FlightCallOptions& call_options,
- const std::shared_ptr<FlightInfo>& flight_info, size_t queue_capacity)
+ FlightSqlClient& flight_sql_client, const FlightClientOptions&
client_options,
+ const FlightCallOptions& call_options, const std::shared_ptr<FlightInfo>&
flight_info,
+ size_t queue_capacity)
: queue_(queue_capacity) {
- // FIXME: Endpoint iteration should consider endpoints may be at different
hosts
for (const auto& endpoint : flight_info->endpoints()) {
const Ticket& ticket = endpoint.ticket;
- auto result = flight_sql_client.DoGet(call_options, ticket);
+ arrow::Result<std::unique_ptr<FlightStreamReader>> result;
+ std::shared_ptr<FlightSqlClient> temp_flight_sql_client;
+ auto endpoint_locations = endpoint.locations;
+ if (endpoint_locations.empty()) {
+ // list of Locations needs to be empty to proceed
+ result = flight_sql_client.DoGet(call_options, ticket);
+ } else {
+ // If it is non-empty, the driver should create a FlightSqlClient to
connect to one
+ // of the specified Locations directly.
+
+ // GH-47117: Currently a new FlightClient will be made for each
partition that
+ // returns a non-empty Location, which is then disposed of. It may be
better to
+ // cache clients because a server may report the same Locations. It
would also be
+ // good to identify when the reported Location is the same as the
original
+ // connection's Location and skip creating a FlightClient in that
scenario.
+
+ std::unique_ptr<FlightClient> temp_flight_client;
+ util::ThrowIfNotOK(FlightClient::Connect(endpoint_locations[0],
client_options)
+ .Value(&temp_flight_client));
+ temp_flight_sql_client.reset(new
FlightSqlClient(std::move(temp_flight_client)));
Review Comment:
make_shared?
##########
cpp/src/arrow/flight/sql/odbc/odbc_impl/flight_sql_stream_chunk_buffer.h:
##########
@@ -23,11 +23,15 @@
namespace arrow::flight::sql::odbc {
+using arrow::Result;
Review Comment:
Can we avoid putting `using` in headers?
##########
cpp/src/arrow/flight/sql/odbc/odbc_impl/flight_sql_stream_chunk_buffer_test.cc:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array.h"
+
+#include "arrow/testing/gtest_util.h"
+
+#include "arrow/flight/sql/odbc/odbc_impl/flight_sql_stream_chunk_buffer.h"
+#include "arrow/flight/sql/odbc/odbc_impl/json_converter.h"
+#include "arrow/flight/test_flight_server.h"
+#include "arrow/flight/test_util.h"
+
+#include <gtest/gtest.h>
+
+namespace arrow::flight::sql::odbc {
+
+using arrow::Array;
+using arrow::flight::FlightCallOptions;
+using arrow::flight::FlightClientOptions;
+using arrow::flight::FlightDescriptor;
+using arrow::flight::FlightEndpoint;
+using arrow::flight::Location;
+using arrow::flight::Ticket;
+using arrow::flight::sql::FlightSqlClient;
+
+class FlightStreamChunkBufferTest : public ::testing::Test {
+ // Sets up two mock servers for each test case.
+ // This is for testing endpoint iteration only.
+
+ protected:
+ void SetUp() override {
+ // Set up server 1
+ server1 = std::make_shared<arrow::flight::TestFlightServer>();
+ ASSERT_OK_AND_ASSIGN(auto location1, Location::ForGrpcTcp("0.0.0.0", 0));
+ arrow::flight::FlightServerOptions options1(location1);
+ ASSERT_OK(server1->Init(options1));
+ ASSERT_OK_AND_ASSIGN(server_location1,
+ Location::ForGrpcTcp("localhost", server1->port()));
+
+ // Set up server 2
+ server2 = std::make_shared<arrow::flight::TestFlightServer>();
+ ASSERT_OK_AND_ASSIGN(auto location2, Location::ForGrpcTcp("0.0.0.0", 0));
+ arrow::flight::FlightServerOptions options2(location2);
+ ASSERT_OK(server2->Init(options2));
+ ASSERT_OK_AND_ASSIGN(server_location2,
+ Location::ForGrpcTcp("localhost", server2->port()));
+
+ // Make SQL Client that is connected to server 1
+ ASSERT_OK_AND_ASSIGN(auto client,
arrow::flight::FlightClient::Connect(location1));
+ sql_client.reset(new FlightSqlClient(std::move(client)));
+ }
+
+ void TearDown() override {
+ ASSERT_OK(server1->Shutdown());
+ ASSERT_OK(server1->Wait());
+ ASSERT_OK(server2->Shutdown());
+ ASSERT_OK(server1->Wait());
+ }
+
+ public:
+ arrow::flight::Location server_location1;
+ std::shared_ptr<arrow::flight::TestFlightServer> server1;
+ arrow::flight::Location server_location2;
+ std::shared_ptr<arrow::flight::TestFlightServer> server2;
+ std::shared_ptr<FlightSqlClient> sql_client;
+};
+
+FlightInfo MultipleEndpointsFlightInfo(Location location1, Location location2)
{
+ // Sever will generate random data for `ticket-ints-1`
+ FlightEndpoint endpoint1({Ticket{"ticket-ints-1"}, {location1},
std::nullopt, {}});
+ FlightEndpoint endpoint2({Ticket{"ticket-ints-1"}, {location2},
std::nullopt, {}});
+
+ FlightDescriptor descr1{FlightDescriptor::PATH, "", {"examples", "ints"}};
+
+ auto schema1 = arrow::flight::ExampleIntSchema();
+
+ return arrow::flight::MakeFlightInfo(*schema1, descr1, {endpoint1,
endpoint2}, 1000,
+ 100000, false, "");
+}
+
+void VerifyArraysContainIntsOnly(std::shared_ptr<Array> intArray) {
+ for (int64_t i = 0; i < intArray->length(); ++i) {
+ // null values are accepted
+ if (!intArray->IsNull(i)) {
+ auto scalar_data = intArray->GetScalar(i).ValueOrDie();
+ std::string scalar_str = ConvertToJson(*scalar_data);
+ ASSERT_TRUE(std::all_of(scalar_str.begin(), scalar_str.end(),
::isdigit));
+ }
+ }
+}
Review Comment:
Can't you just check the array type?
##########
cpp/src/arrow/flight/sql/odbc/odbc_impl/flight_sql_stream_chunk_buffer.cc:
##########
@@ -20,37 +20,67 @@
namespace arrow::flight::sql::odbc {
-using arrow::Result;
-
FlightStreamChunkBuffer::FlightStreamChunkBuffer(
- FlightSqlClient& flight_sql_client, const FlightCallOptions& call_options,
- const std::shared_ptr<FlightInfo>& flight_info, size_t queue_capacity)
+ FlightSqlClient& flight_sql_client, const FlightClientOptions&
client_options,
+ const FlightCallOptions& call_options, const std::shared_ptr<FlightInfo>&
flight_info,
+ size_t queue_capacity)
: queue_(queue_capacity) {
- // FIXME: Endpoint iteration should consider endpoints may be at different
hosts
for (const auto& endpoint : flight_info->endpoints()) {
const Ticket& ticket = endpoint.ticket;
- auto result = flight_sql_client.DoGet(call_options, ticket);
+ arrow::Result<std::unique_ptr<FlightStreamReader>> result;
+ std::shared_ptr<FlightSqlClient> temp_flight_sql_client;
+ auto endpoint_locations = endpoint.locations;
+ if (endpoint_locations.empty()) {
+ // list of Locations needs to be empty to proceed
+ result = flight_sql_client.DoGet(call_options, ticket);
+ } else {
+ // If it is non-empty, the driver should create a FlightSqlClient to
connect to one
+ // of the specified Locations directly.
+
+ // GH-47117: Currently a new FlightClient will be made for each
partition that
+ // returns a non-empty Location, which is then disposed of. It may be
better to
+ // cache clients because a server may report the same Locations. It
would also be
+ // good to identify when the reported Location is the same as the
original
+ // connection's Location and skip creating a FlightClient in that
scenario.
+
+ std::unique_ptr<FlightClient> temp_flight_client;
+ util::ThrowIfNotOK(FlightClient::Connect(endpoint_locations[0],
client_options)
+ .Value(&temp_flight_client));
+ temp_flight_sql_client.reset(new
FlightSqlClient(std::move(temp_flight_client)));
+
+ result = temp_flight_sql_client->DoGet(call_options, ticket);
+ }
+
util::ThrowIfNotOK(result.status());
std::shared_ptr<FlightStreamReader>
stream_reader_ptr(std::move(result.ValueOrDie()));
- BlockingQueue<Result<FlightStreamChunk>>::Supplier supplier = [=] {
+ BlockingQueue<std::pair<Result<FlightStreamChunk>,
+ std::shared_ptr<FlightSqlClient>>>::Supplier
supplier = [=] {
auto result = stream_reader_ptr->Next();
bool is_not_ok = !result.ok();
bool is_not_empty = result.ok() && (result.ValueOrDie().data != nullptr);
- return boost::make_optional(is_not_ok || is_not_empty,
std::move(result));
+ // If result is valid, save the temp Flight SQL Client for future stream
reader
+ // call. temp_flight_sql_client is intentionally null if the list of
endpoint
+ // Locations is empty.
+ // After all data is fetched from reader, the temp client is closed.
+ return boost::make_optional(
+ is_not_ok || is_not_empty,
+ std::make_pair(std::move(result), temp_flight_sql_client));
Review Comment:
Can we get rid of boost?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]