emkornfield commented on a change in pull request #9387: URL: https://github.com/apache/arrow/pull/9387#discussion_r568300987
########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java ########## @@ -422,6 +436,52 @@ private InputStream asInputStream(BufferAllocator allocator) { } + /** + * ARROW-11066: enable the zero-copy optimization and protect against use-after-free. + * + * When you send a message through gRPC, the following happens: + * 1. gRPC immediately serializes the message, eventually calling asInputStream above. + * 2. gRPC buffers the serialized message for sending. + * 3. Later, gRPC will actually write out the message. + * + * The problem with this is that when the zero-copy optimization is enabled, Flight + * "serializes" the message by handing gRPC references to Arrow data. That means we need + * a way to keep the Arrow buffers valid until gRPC actually writes them, else, we'll read + * invalid data or segfault. gRPC doesn't know anything about Arrow buffers, either. + * + * This class solves that issue by bridging Arrow and Netty/gRPC. We increment the refcnt + * on a set of Arrow backing buffers and decrement them once the Netty buffers are freed + * by gRPC. + */ + private static final class ArrowBufRetainingCompositeByteBuf extends CompositeByteBuf { + // Arrow buffers that back the Netty ByteBufs here; ByteBufs held by this class are + // either slices of one of the ArrowBufs or independently allocated. + final List<ArrowBuf> backingBuffers; + boolean freed; + + ArrowBufRetainingCompositeByteBuf(int maxNumComponents, Iterable<ByteBuf> buffers, List<ArrowBuf> backingBuffers) { + super(UnpooledByteBufAllocator.DEFAULT, /* direct */ true, maxNumComponents, buffers); + this.backingBuffers = backingBuffers; + this.freed = false; + // N.B. the Netty superclass avoids enhanced-for to reduce GC pressure, so follow that here + for (int i = 0; i < backingBuffers.size(); i++) { + backingBuffers.get(i).getReferenceManager().retain(); + } + } + + @Override + protected void deallocate() { + super.deallocate(); + if (freed) { + return; + } + freed = true; + for (int i = 0; i < backingBuffers.size(); i++) { + backingBuffers.get(i).getReferenceManager().release(); Review comment: This seems OK to me. If refCnt isn't happening correctly there are probably deeper problems in the system. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org