emkornfield commented on a change in pull request #7012:
URL: https://github.com/apache/arrow/pull/7012#discussion_r419200063



##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -293,6 +292,76 @@ public void onCompleted() {
     return stream;
   }
 
+  /**
+   * Initiate a bidirectional data exchange with the server.
+   *
+   * @param descriptor A descriptor for the data stream.
+   * @param options RPC call options.
+   * @return A pair of a readable stream and a writable stream.
+   */
+  public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, 
CallOption... options) {
+    Preconditions.checkNotNull(descriptor);
+    final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, 
options).getCallOptions();
+
+    try {
+      final ClientCall<ArrowMessage, ArrowMessage> call = 
interceptedChannel.newCall(doExchangeDescriptor, callOptions);
+      final FlightStream stream = new FlightStream(allocator, 
PENDING_REQUESTS, call::cancel, call::request);
+      final ClientCallStreamObserver<ArrowMessage> observer = 
(ClientCallStreamObserver<ArrowMessage>)
+              ClientCalls.asyncBidiStreamingCall(call, stream.asObserver());
+      final ClientStreamListener writer = new PutObserver(
+          descriptor, observer, stream.completed::isDone,
+          () -> {
+            try {
+              stream.completed.get();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw CallStatus.INTERNAL.withDescription("Client error: 
interrupted").withCause(e).toRuntimeException();
+            } catch (ExecutionException e) {
+              throw CallStatus.INTERNAL.withDescription("Client error: " + 
e).withCause(e).toRuntimeException();
+            }
+          });
+      // Send the descriptor to start.
+      try (final ArrowMessage message = new 
ArrowMessage(descriptor.toProtocol())) {
+        observer.onNext(message);
+      } catch (Exception e) {
+        throw CallStatus.INTERNAL
+            .withCause(e)
+            .withDescription("Could not write descriptor message: " + e)

Review comment:
       I think there are some other places where you end up concatenating the 
exception, could you take a look?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to