lidavidm commented on code in PR #38521:
URL: https://github.com/apache/arrow/pull/38521#discussion_r1381765500


##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java:
##########
@@ -95,11 +102,48 @@ private CallOption[] getOptions() {
    * @param flightInfo The {@link FlightInfo} instance from which to fetch 
results.
    * @return a {@code FlightStream} of results.
    */
-  public List<FlightStream> getStreams(final FlightInfo flightInfo) {
-    return flightInfo.getEndpoints().stream()
-        .map(FlightEndpoint::getTicket)
-        .map(ticket -> sqlClient.getStream(ticket, getOptions()))
-        .collect(Collectors.toList());
+  public List<CloseableEndpointStreamPair> getStreams(final FlightInfo 
flightInfo) throws SQLException {
+    final ArrayList<CloseableEndpointStreamPair> endpoints =
+        new ArrayList<>(flightInfo.getEndpoints().size());
+
+    try {
+      for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
+        if (endpoint.getLocations().isEmpty()) {
+          // Create a stream using the current client only and do not close 
the client at the end.
+          endpoints.add(new CloseableEndpointStreamPair(
+              sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
+        } else {
+          // Clone the builder and then set the new endpoint on it.
+          final URI endpointUri = endpoint.getLocations().get(0).getUri();
+          final Builder builderForEndpoint = new 
Builder(ArrowFlightSqlClientHandler.this.builder)
+              .withHost(endpointUri.getHost())
+              .withPort(endpointUri.getPort())
+              
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
+
+          final ArrowFlightSqlClientHandler endpointHandler = 
builderForEndpoint.build();
+          try {
+            endpoints.add(new CloseableEndpointStreamPair(
+                endpointHandler.sqlClient.getStream(endpoint.getTicket(),
+                    endpointHandler.getOptions()), endpointHandler.sqlClient));
+          } catch (Exception ex) {
+            AutoCloseables.close(endpointHandler);
+            throw ex;
+          }

Review Comment:
   Some future improvements for this would be to test each of the locations and 
use the first one that works; ignore locations with unknown URI schemes for 
forwards compatibility; fall back to the current client if none of the 
locations work. Could we file issue(s) for this?



##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java:
##########
@@ -95,11 +102,48 @@ private CallOption[] getOptions() {
    * @param flightInfo The {@link FlightInfo} instance from which to fetch 
results.
    * @return a {@code FlightStream} of results.
    */
-  public List<FlightStream> getStreams(final FlightInfo flightInfo) {
-    return flightInfo.getEndpoints().stream()
-        .map(FlightEndpoint::getTicket)
-        .map(ticket -> sqlClient.getStream(ticket, getOptions()))
-        .collect(Collectors.toList());
+  public List<CloseableEndpointStreamPair> getStreams(final FlightInfo 
flightInfo) throws SQLException {
+    final ArrayList<CloseableEndpointStreamPair> endpoints =
+        new ArrayList<>(flightInfo.getEndpoints().size());
+
+    try {
+      for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
+        if (endpoint.getLocations().isEmpty()) {
+          // Create a stream using the current client only and do not close 
the client at the end.
+          endpoints.add(new CloseableEndpointStreamPair(
+              sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
+        } else {
+          // Clone the builder and then set the new endpoint on it.
+          final URI endpointUri = endpoint.getLocations().get(0).getUri();
+          final Builder builderForEndpoint = new 
Builder(ArrowFlightSqlClientHandler.this.builder)
+              .withHost(endpointUri.getHost())
+              .withPort(endpointUri.getPort())
+              
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
+
+          final ArrowFlightSqlClientHandler endpointHandler = 
builderForEndpoint.build();
+          try {
+            endpoints.add(new CloseableEndpointStreamPair(
+                endpointHandler.sqlClient.getStream(endpoint.getTicket(),
+                    endpointHandler.getOptions()), endpointHandler.sqlClient));
+          } catch (Exception ex) {
+            AutoCloseables.close(endpointHandler);
+            throw ex;
+          }

Review Comment:
   Also, having a cache of clients instead of opening new ones every time (e.g. 
via https://github.com/ben-manes/caffeine)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to