alinaliBQ commented on code in PR #47991:
URL: https://github.com/apache/arrow/pull/47991#discussion_r2505883994


##########
cpp/src/arrow/flight/sql/odbc/odbc_impl/flight_sql_stream_chunk_buffer.cc:
##########
@@ -20,37 +20,67 @@
 
 namespace arrow::flight::sql::odbc {
 
-using arrow::Result;
-
 FlightStreamChunkBuffer::FlightStreamChunkBuffer(
-    FlightSqlClient& flight_sql_client, const FlightCallOptions& call_options,
-    const std::shared_ptr<FlightInfo>& flight_info, size_t queue_capacity)
+    FlightSqlClient& flight_sql_client, const FlightClientOptions& 
client_options,
+    const FlightCallOptions& call_options, const std::shared_ptr<FlightInfo>& 
flight_info,
+    size_t queue_capacity)
     : queue_(queue_capacity) {
-  // FIXME: Endpoint iteration should consider endpoints may be at different 
hosts
   for (const auto& endpoint : flight_info->endpoints()) {
     const Ticket& ticket = endpoint.ticket;
 
-    auto result = flight_sql_client.DoGet(call_options, ticket);
+    arrow::Result<std::unique_ptr<FlightStreamReader>> result;
+    std::shared_ptr<FlightSqlClient> temp_flight_sql_client;
+    auto endpoint_locations = endpoint.locations;
+    if (endpoint_locations.empty()) {
+      // list of Locations needs to be empty to proceed
+      result = flight_sql_client.DoGet(call_options, ticket);
+    } else {
+      // If it is non-empty, the driver should create a FlightSqlClient to 
connect to one
+      // of the specified Locations directly.
+
+      // GH-47117: Currently a new FlightClient will be made for each 
partition that
+      // returns a non-empty Location, which is then disposed of. It may be 
better to
+      // cache clients because a server may report the same Locations. It 
would also be
+      // good to identify when the reported Location is the same as the 
original
+      // connection's Location and skip creating a FlightClient in that 
scenario.
+
+      std::unique_ptr<FlightClient> temp_flight_client;
+      util::ThrowIfNotOK(FlightClient::Connect(endpoint_locations[0], 
client_options)
+                             .Value(&temp_flight_client));
+      temp_flight_sql_client.reset(new 
FlightSqlClient(std::move(temp_flight_client)));
+
+      result = temp_flight_sql_client->DoGet(call_options, ticket);
+    }
+
     util::ThrowIfNotOK(result.status());
     std::shared_ptr<FlightStreamReader> 
stream_reader_ptr(std::move(result.ValueOrDie()));
 
-    BlockingQueue<Result<FlightStreamChunk>>::Supplier supplier = [=] {
+    BlockingQueue<std::pair<Result<FlightStreamChunk>,
+                            std::shared_ptr<FlightSqlClient>>>::Supplier 
supplier = [=] {
       auto result = stream_reader_ptr->Next();
       bool is_not_ok = !result.ok();
       bool is_not_empty = result.ok() && (result.ValueOrDie().data != nullptr);
 
-      return boost::make_optional(is_not_ok || is_not_empty, 
std::move(result));
+      // If result is valid, save the temp Flight SQL Client for future stream 
reader
+      // call. temp_flight_sql_client is intentionally null if the list of 
endpoint
+      // Locations is empty.
+      // After all data is fetched from reader, the temp client is closed.
+      return boost::make_optional(
+          is_not_ok || is_not_empty,
+          std::make_pair(std::move(result), temp_flight_sql_client));

Review Comment:
   Since `boost-optional` is blocking the MSVC CI (for some reason it doesn't 
block my local MSVC build), I will remove `boost-optional` in 
https://github.com/apache/arrow/pull/48067



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to