lidavidm commented on a change in pull request #7012:
URL: https://github.com/apache/arrow/pull/7012#discussion_r419413303
##########
File path:
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -293,6 +292,76 @@ public void onCompleted() {
return stream;
}
+ /**
+ * Initiate a bidirectional data exchange with the server.
+ *
+ * @param descriptor A descriptor for the data stream.
+ * @param options RPC call options.
+ * @return A pair of a readable stream and a writable stream.
+ */
+ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor,
CallOption... options) {
+ Preconditions.checkNotNull(descriptor);
+ final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub,
options).getCallOptions();
+
+ try {
+ final ClientCall<ArrowMessage, ArrowMessage> call =
interceptedChannel.newCall(doExchangeDescriptor, callOptions);
+ final FlightStream stream = new FlightStream(allocator,
PENDING_REQUESTS, call::cancel, call::request);
+ final ClientCallStreamObserver<ArrowMessage> observer =
(ClientCallStreamObserver<ArrowMessage>)
+ ClientCalls.asyncBidiStreamingCall(call, stream.asObserver());
+ final ClientStreamListener writer = new PutObserver(
+ descriptor, observer, stream.completed::isDone,
+ () -> {
+ try {
+ stream.completed.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw CallStatus.INTERNAL.withDescription("Client error:
interrupted").withCause(e).toRuntimeException();
+ } catch (ExecutionException e) {
+ throw CallStatus.INTERNAL.withDescription("Client error: " +
e).withCause(e).toRuntimeException();
+ }
+ });
+ // Send the descriptor to start.
+ try (final ArrowMessage message = new
ArrowMessage(descriptor.toProtocol())) {
+ observer.onNext(message);
+ } catch (Exception e) {
+ throw CallStatus.INTERNAL
+ .withCause(e)
+ .withDescription("Could not write descriptor message: " + e)
Review comment:
I've gone and fixed the other occurrences.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]