lidavidm commented on a change in pull request #9387:
URL: https://github.com/apache/arrow/pull/9387#discussion_r568215760



##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -422,6 +436,52 @@ private InputStream asInputStream(BufferAllocator 
allocator) {
 
   }
 
+  /**
+   * ARROW-11066: enable the zero-copy optimization and protect against 
use-after-free.
+   *
+   * When you send a message through gRPC, the following happens:
+   * 1. gRPC immediately serializes the message, eventually calling 
asInputStream above.
+   * 2. gRPC buffers the serialized message for sending.
+   * 3. Later, gRPC will actually write out the message.
+   *
+   * The problem with this is that when the zero-copy optimization is enabled, 
Flight
+   * "serializes" the message by handing gRPC references to Arrow data. That 
means we need
+   * a way to keep the Arrow buffers valid until gRPC actually writes them, 
else, we'll read
+   * invalid data or segfault. gRPC doesn't know anything about Arrow buffers, 
either.
+   *
+   * This class solves that issue by bridging Arrow and Netty/gRPC. We 
increment the refcnt
+   * on a set of Arrow backing buffers and decrement them once the Netty 
buffers are freed
+   * by gRPC.
+   */
+  private static final class ArrowBufRetainingCompositeByteBuf extends 
CompositeByteBuf {
+    // Arrow buffers that back the Netty ByteBufs here; ByteBufs held by this 
class are
+    // either slices of one of the ArrowBufs or independently allocated.
+    final List<ArrowBuf> backingBuffers;
+    boolean freed;
+
+    ArrowBufRetainingCompositeByteBuf(int maxNumComponents, Iterable<ByteBuf> 
buffers, List<ArrowBuf> backingBuffers) {
+      super(UnpooledByteBufAllocator.DEFAULT, /* direct */ true, 
maxNumComponents, buffers);
+      this.backingBuffers = backingBuffers;
+      this.freed = false;
+      // N.B. the Netty superclass avoids enhanced-for to reduce GC pressure, 
so follow that here
+      for (int i = 0; i < backingBuffers.size(); i++) {
+        backingBuffers.get(i).getReferenceManager().retain();

Review comment:
       Retain seems ok, it would only throw if the buffer was invalid 
originally: 
https://github.com/apache/arrow/blob/9a898dd0bbd08d5635d3e47697f8d2235024a504/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java#L172-L186

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -422,6 +436,52 @@ private InputStream asInputStream(BufferAllocator 
allocator) {
 
   }
 
+  /**
+   * ARROW-11066: enable the zero-copy optimization and protect against 
use-after-free.
+   *
+   * When you send a message through gRPC, the following happens:
+   * 1. gRPC immediately serializes the message, eventually calling 
asInputStream above.
+   * 2. gRPC buffers the serialized message for sending.
+   * 3. Later, gRPC will actually write out the message.
+   *
+   * The problem with this is that when the zero-copy optimization is enabled, 
Flight
+   * "serializes" the message by handing gRPC references to Arrow data. That 
means we need
+   * a way to keep the Arrow buffers valid until gRPC actually writes them, 
else, we'll read
+   * invalid data or segfault. gRPC doesn't know anything about Arrow buffers, 
either.
+   *
+   * This class solves that issue by bridging Arrow and Netty/gRPC. We 
increment the refcnt
+   * on a set of Arrow backing buffers and decrement them once the Netty 
buffers are freed
+   * by gRPC.
+   */
+  private static final class ArrowBufRetainingCompositeByteBuf extends 
CompositeByteBuf {
+    // Arrow buffers that back the Netty ByteBufs here; ByteBufs held by this 
class are
+    // either slices of one of the ArrowBufs or independently allocated.
+    final List<ArrowBuf> backingBuffers;
+    boolean freed;
+
+    ArrowBufRetainingCompositeByteBuf(int maxNumComponents, Iterable<ByteBuf> 
buffers, List<ArrowBuf> backingBuffers) {
+      super(UnpooledByteBufAllocator.DEFAULT, /* direct */ true, 
maxNumComponents, buffers);
+      this.backingBuffers = backingBuffers;
+      this.freed = false;
+      // N.B. the Netty superclass avoids enhanced-for to reduce GC pressure, 
so follow that here
+      for (int i = 0; i < backingBuffers.size(); i++) {
+        backingBuffers.get(i).getReferenceManager().retain();
+      }
+    }
+
+    @Override
+    protected void deallocate() {
+      super.deallocate();
+      if (freed) {
+        return;
+      }
+      freed = true;
+      for (int i = 0; i < backingBuffers.size(); i++) {
+        backingBuffers.get(i).getReferenceManager().release();

Review comment:
       Release can throw if the reference counts go negative. Is there a helper 
for 'release this list as safely as possible'? Else I'll catch and defer thrown 
exceptions until we've freed all buffers we can here.
   
   
https://github.com/apache/arrow/blob/9a898dd0bbd08d5635d3e47697f8d2235024a504/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java#L119-L132

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/AddWritableBuffer.java
##########
@@ -88,11 +87,8 @@
    * @param buf The buffer to add.
    * @param stream The Candidate OutputStream to add to.
    * @return True if added. False if not possible.
-   * @throws IOException on error
    */
-  public static boolean add(ByteBuf buf, OutputStream stream) throws 
IOException {
-    buf.readBytes(stream, buf.readableBytes());

Review comment:
       Ah, FAST_PATH is an existing flag - it's actually used at the callsite: 
https://github.com/apache/arrow/blob/6b85f6eba6d8403dbd549212fbc86f0e8ee192d0/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java#L437-L439
   
   But I agree it'd be clearer if we moved FAST_PATH into here and additionally 
had it handle the fallback path.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/AddWritableBuffer.java
##########
@@ -88,11 +87,8 @@
    * @param buf The buffer to add.
    * @param stream The Candidate OutputStream to add to.
    * @return True if added. False if not possible.
-   * @throws IOException on error
    */
-  public static boolean add(ByteBuf buf, OutputStream stream) throws 
IOException {
-    buf.readBytes(stream, buf.readableBytes());

Review comment:
       Moved the fallback path into this method (mimicking GetReadableBuffer).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to