lidavidm commented on a change in pull request #7012:
URL: https://github.com/apache/arrow/pull/7012#discussion_r417290517
##########
File path:
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -155,79 +152,35 @@ public void setOnCancelHandler(Runnable handler) {
this.onCancelHandler = handler;
}
- @Override
- public boolean isReady() {
- return responseObserver.isReady();
- }
-
@Override
public boolean isCancelled() {
return responseObserver.isCancelled();
}
@Override
- public void start(VectorSchemaRoot root) {
- start(root, new MapDictionaryProvider());
- }
-
- @Override
- public void start(VectorSchemaRoot root, DictionaryProvider provider) {
- unloader = new VectorUnloader(root, true, true);
-
- try {
- DictionaryUtils.generateSchemaMessages(root.getSchema(), null,
provider, responseObserver::onNext);
- } catch (Exception e) {
- // Only happens if closing buffers somehow fails - indicates
application is an unknown state so propagate
- // the exception
- throw new RuntimeException("Could not generate and send all schema
messages", e);
- }
- }
-
- @Override
- public void putNext() {
- putNext(null);
- }
-
- @Override
- public void putNext(ArrowBuf metadata) {
- Preconditions.checkNotNull(unloader);
- // close is a no-op if the message has been written to gRPC, otherwise
frees the associated buffers
- // in some code paths (e.g. if the call is cancelled), gRPC does not
write the message, so we need to clean up
- // ourselves. Normally, writing the ArrowMessage will transfer ownership
of the data to gRPC/Netty.
- try (final ArrowMessage message = new
ArrowMessage(unloader.getRecordBatch(), metadata)) {
- responseObserver.onNext(message);
- } catch (Exception e) {
- // This exception comes from ArrowMessage#close, not
responseObserver#onNext.
- // Generally this should not happen - ArrowMessage's implementation
only closes non-throwing things.
- // The user can't reasonably do anything about this, but if something
does throw, we shouldn't let
- // execution continue since other state (e.g. allocators) may be in an
odd state.
- throw new RuntimeException("Could not free ArrowMessage", e);
- }
+ protected void waitUntilStreamReady() {
+ // Don't do anything - service implementations are expected to manage
backpressure themselves
Review comment:
There's no need for a hook here - services can poll `isReady` to know if
they can write without backpressure.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]