lidavidm commented on code in PR #38521:
URL: https://github.com/apache/arrow/pull/38521#discussion_r1381765500
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java:
##########
@@ -95,11 +102,48 @@ private CallOption[] getOptions() {
* @param flightInfo The {@link FlightInfo} instance from which to fetch
results.
* @return a {@code FlightStream} of results.
*/
- public List<FlightStream> getStreams(final FlightInfo flightInfo) {
- return flightInfo.getEndpoints().stream()
- .map(FlightEndpoint::getTicket)
- .map(ticket -> sqlClient.getStream(ticket, getOptions()))
- .collect(Collectors.toList());
+ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo
flightInfo) throws SQLException {
+ final ArrayList<CloseableEndpointStreamPair> endpoints =
+ new ArrayList<>(flightInfo.getEndpoints().size());
+
+ try {
+ for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
+ if (endpoint.getLocations().isEmpty()) {
+ // Create a stream using the current client only and do not close
the client at the end.
+ endpoints.add(new CloseableEndpointStreamPair(
+ sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
+ } else {
+ // Clone the builder and then set the new endpoint on it.
+ final URI endpointUri = endpoint.getLocations().get(0).getUri();
+ final Builder builderForEndpoint = new
Builder(ArrowFlightSqlClientHandler.this.builder)
+ .withHost(endpointUri.getHost())
+ .withPort(endpointUri.getPort())
+
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
+
+ final ArrowFlightSqlClientHandler endpointHandler =
builderForEndpoint.build();
+ try {
+ endpoints.add(new CloseableEndpointStreamPair(
+ endpointHandler.sqlClient.getStream(endpoint.getTicket(),
+ endpointHandler.getOptions()), endpointHandler.sqlClient));
+ } catch (Exception ex) {
+ AutoCloseables.close(endpointHandler);
+ throw ex;
+ }
Review Comment:
Some future improvements for this would be to test each of the locations and
use the first one that works; ignore locations with unknown URI schemes for
forwards compatibility; fall back to the current client if none of the
locations work. Could we file issue(s) for this?
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java:
##########
@@ -95,11 +102,48 @@ private CallOption[] getOptions() {
* @param flightInfo The {@link FlightInfo} instance from which to fetch
results.
* @return a {@code FlightStream} of results.
*/
- public List<FlightStream> getStreams(final FlightInfo flightInfo) {
- return flightInfo.getEndpoints().stream()
- .map(FlightEndpoint::getTicket)
- .map(ticket -> sqlClient.getStream(ticket, getOptions()))
- .collect(Collectors.toList());
+ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo
flightInfo) throws SQLException {
+ final ArrayList<CloseableEndpointStreamPair> endpoints =
+ new ArrayList<>(flightInfo.getEndpoints().size());
+
+ try {
+ for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
+ if (endpoint.getLocations().isEmpty()) {
+ // Create a stream using the current client only and do not close
the client at the end.
+ endpoints.add(new CloseableEndpointStreamPair(
+ sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
+ } else {
+ // Clone the builder and then set the new endpoint on it.
+ final URI endpointUri = endpoint.getLocations().get(0).getUri();
+ final Builder builderForEndpoint = new
Builder(ArrowFlightSqlClientHandler.this.builder)
+ .withHost(endpointUri.getHost())
+ .withPort(endpointUri.getPort())
+
.withEncryption(endpointUri.getScheme().equals(LocationSchemes.GRPC_TLS));
+
+ final ArrowFlightSqlClientHandler endpointHandler =
builderForEndpoint.build();
+ try {
+ endpoints.add(new CloseableEndpointStreamPair(
+ endpointHandler.sqlClient.getStream(endpoint.getTicket(),
+ endpointHandler.getOptions()), endpointHandler.sqlClient));
+ } catch (Exception ex) {
+ AutoCloseables.close(endpointHandler);
+ throw ex;
+ }
Review Comment:
Also, having a cache of clients instead of opening new ones every time (e.g.
via https://github.com/ben-manes/caffeine)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]