emkornfield commented on a change in pull request #9387:
URL: https://github.com/apache/arrow/pull/9387#discussion_r568169871



##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -422,6 +436,52 @@ private InputStream asInputStream(BufferAllocator 
allocator) {
 
   }
 
+  /**
+   * ARROW-11066: enable the zero-copy optimization and protect against 
use-after-free.
+   *
+   * When you send a message through gRPC, the following happens:
+   * 1. gRPC immediately serializes the message, eventually calling 
asInputStream above.
+   * 2. gRPC buffers the serialized message for sending.
+   * 3. Later, gRPC will actually write out the message.
+   *
+   * The problem with this is that when the zero-copy optimization is enabled, 
Flight
+   * "serializes" the message by handing gRPC references to Arrow data. That 
means we need
+   * a way to keep the Arrow buffers valid until gRPC actually writes them, 
else, we'll read
+   * invalid data or segfault. gRPC doesn't know anything about Arrow buffers, 
either.
+   *
+   * This class solves that issue by bridging Arrow and Netty/gRPC. We 
increment the refcnt
+   * on a set of Arrow backing buffers and decrement them once the Netty 
buffers are freed
+   * by gRPC.
+   */
+  private static final class ArrowBufRetainingCompositeByteBuf extends 
CompositeByteBuf {
+    // Arrow buffers that back the Netty ByteBufs here; ByteBufs held by this 
class are
+    // either slices of one of the ArrowBufs or independently allocated.
+    final List<ArrowBuf> backingBuffers;
+    boolean freed;
+
+    ArrowBufRetainingCompositeByteBuf(int maxNumComponents, Iterable<ByteBuf> 
buffers, List<ArrowBuf> backingBuffers) {
+      super(UnpooledByteBufAllocator.DEFAULT, /* direct */ true, 
maxNumComponents, buffers);
+      this.backingBuffers = backingBuffers;
+      this.freed = false;
+      // N.B. the Netty superclass avoids enhanced-for to reduce GC pressure, 
so follow that here
+      for (int i = 0; i < backingBuffers.size(); i++) {
+        backingBuffers.get(i).getReferenceManager().retain();

Review comment:
       same question as below I assume this can't throw?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to