emkornfield commented on a change in pull request #9387:
URL: https://github.com/apache/arrow/pull/9387#discussion_r568300987



##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -422,6 +436,52 @@ private InputStream asInputStream(BufferAllocator 
allocator) {
 
   }
 
+  /**
+   * ARROW-11066: enable the zero-copy optimization and protect against 
use-after-free.
+   *
+   * When you send a message through gRPC, the following happens:
+   * 1. gRPC immediately serializes the message, eventually calling 
asInputStream above.
+   * 2. gRPC buffers the serialized message for sending.
+   * 3. Later, gRPC will actually write out the message.
+   *
+   * The problem with this is that when the zero-copy optimization is enabled, 
Flight
+   * "serializes" the message by handing gRPC references to Arrow data. That 
means we need
+   * a way to keep the Arrow buffers valid until gRPC actually writes them, 
else, we'll read
+   * invalid data or segfault. gRPC doesn't know anything about Arrow buffers, 
either.
+   *
+   * This class solves that issue by bridging Arrow and Netty/gRPC. We 
increment the refcnt
+   * on a set of Arrow backing buffers and decrement them once the Netty 
buffers are freed
+   * by gRPC.
+   */
+  private static final class ArrowBufRetainingCompositeByteBuf extends 
CompositeByteBuf {
+    // Arrow buffers that back the Netty ByteBufs here; ByteBufs held by this 
class are
+    // either slices of one of the ArrowBufs or independently allocated.
+    final List<ArrowBuf> backingBuffers;
+    boolean freed;
+
+    ArrowBufRetainingCompositeByteBuf(int maxNumComponents, Iterable<ByteBuf> 
buffers, List<ArrowBuf> backingBuffers) {
+      super(UnpooledByteBufAllocator.DEFAULT, /* direct */ true, 
maxNumComponents, buffers);
+      this.backingBuffers = backingBuffers;
+      this.freed = false;
+      // N.B. the Netty superclass avoids enhanced-for to reduce GC pressure, 
so follow that here
+      for (int i = 0; i < backingBuffers.size(); i++) {
+        backingBuffers.get(i).getReferenceManager().retain();
+      }
+    }
+
+    @Override
+    protected void deallocate() {
+      super.deallocate();
+      if (freed) {
+        return;
+      }
+      freed = true;
+      for (int i = 0; i < backingBuffers.size(); i++) {
+        backingBuffers.get(i).getReferenceManager().release();

Review comment:
       This seems OK to me.  If refCnt isn't happening correctly there are 
probably deeper problems in the system.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -67,7 +67,17 @@
  */
 class ArrowMessage implements AutoCloseable {
 
-  public static final boolean FAST_PATH = true;
+  // If true, serialize Arrow data by giving gRPC a reference to the 
underlying Arrow buffer
+  // instead of copying the data. Defaults to true.
+  public static final boolean FAST_PATH;

Review comment:
       nit: maybe ZERO_COPY or DIRECT_TO_GRPC?

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -67,7 +67,17 @@
  */
 class ArrowMessage implements AutoCloseable {
 
-  public static final boolean FAST_PATH = true;
+  // If true, serialize Arrow data by giving gRPC a reference to the 
underlying Arrow buffer
+  // instead of copying the data. Defaults to true.
+  public static final boolean FAST_PATH;

Review comment:
       BTW, thank you for the flag.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -422,6 +436,52 @@ private InputStream asInputStream(BufferAllocator 
allocator) {
 
   }
 
+  /**
+   * ARROW-11066: enable the zero-copy optimization and protect against 
use-after-free.

Review comment:
       thank you for the comments!

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/AddWritableBuffer.java
##########
@@ -88,11 +87,8 @@
    * @param buf The buffer to add.
    * @param stream The Candidate OutputStream to add to.
    * @return True if added. False if not possible.
-   * @throws IOException on error
    */
-  public static boolean add(ByteBuf buf, OutputStream stream) throws 
IOException {
-    buf.readBytes(stream, buf.readableBytes());

Review comment:
       shouldn't this now be wrapped in FAST_PATH?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to