rymurr commented on a change in pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#discussion_r483935015
##########
File path:
java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
##########
@@ -41,7 +41,7 @@
abstract class BaseAllocator extends Accountant implements BufferAllocator {
public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
- public static final int DEBUG_LOG_LENGTH = 6;
+ public static final int DEBUG_LOG_LENGTH = 30;
Review comment:
was this a permanent change or accidentally left in?
##########
File path:
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
##########
@@ -158,29 +167,52 @@ public FlightDescriptor getDescriptor() {
/**
* Closes the stream (freeing any existing resources).
*
- * <p>If the stream isn't complete and is cancellable, this method will
cancel the stream first.</p>
+ * <p>If the stream isn't complete and is cancellable, this method will
cancel and drain the stream first.
*/
public void close() throws Exception {
final List<AutoCloseable> closeables = new ArrayList<>();
- // cancellation can throw, but we still want to clean up resources, so
make it an AutoCloseable too
- closeables.add(() -> {
- if (!completed.isDone() && cancellable != null) {
- cancel("Stream closed before end.", /* no exception to report */ null);
+ Throwable suppressor = null;
+ if (cancellable != null) {
+ // Client-side stream. Cancel the call, to help ensure gRPC doesn't
deliver a message after close() ends.
+ // On the server side, we can't rely on draining the stream , because
this gRPC bug means the completion callback
+ // may never run https://github.com/grpc/grpc-java/issues/5882
+ try {
+ synchronized (cancellable) {
+ if (!cancelled.isDone()) {
+ // Only cancel if the call is not done on the gRPC side
+ cancellable.cancel("Stream closed before end", /* no exception to
report */null);
+ }
+ }
+ // Drain the stream without the lock (as next() implicitly needs the
lock)
+ while (next()) { }
+ } catch (FlightRuntimeException e) {
+ suppressor = e;
}
- });
- if (fulfilledRoot != null) {
- closeables.add(fulfilledRoot);
}
- closeables.add(applicationMetadata);
- closeables.addAll(queue);
- if (dictionaries != null) {
- dictionaries.getDictionaryIds().forEach(id ->
closeables.add(dictionaries.lookup(id).getVector()));
+ // Perform these operations under a lock. This way the observer can't
enqueue new messages while we're in the
Review comment:
Maybe its just the wording of the comments but this seems like its
theoretically possible for an observer to put a message between 187 and the
lock gets aquired in 195. Is that true? The chance is prob pretty small and not
easy to code for. Just wanted a bit of clarification
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]