This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66351e3bc8 GH-40039: [Java][FlightRPC] Improve performance by removing 
unnecessary memory copies (#40042)
66351e3bc8 is described below

commit 66351e3bc82995776304e8e150e3f7068d2254e4
Author: Lev Tolmachev <[email protected]>
AuthorDate: Mon Feb 12 14:27:44 2024 +0000

    GH-40039: [Java][FlightRPC] Improve performance by removing unnecessary 
memory copies (#40042)
    
    ### Rationale for this change
    Described in details in the issue: 
https://github.com/apache/arrow/issues/40039
    
    Summary: class ArrowMessage uses CompositeByteBuf to avoid memory copies 
but `maxNumComponents` for it is calculated incorrectly and as a result memory 
copies are still performed which significantly affects the performance of the 
server.
    
    ### What changes are included in this PR?
    Changing maxNumComponents to `Integer.MAX_VALUE` because we never want to 
silently merge large buffers into one.
    
    User can set useZeroCopy=false (default) and then the library will copy 
data into a new buffer before sending it to Netty for write.
    
    ### Are these changes tested?
    
    **TestPerf: 30% throughput boost**
    ```
    BEFORE
    Transferred 100000000 records totaling 3200000000 bytes at 877.812629 
MiB/s. 28764164.218015 record/s. 7024.784185 batch/s.
    
    AFTER
    Transferred 100000000 records totaling 3200000000 bytes at 1145.333893 
MiB/s. 37530301.022096 record/s. 9165.650116 batch/s.
    ```
    
    Also tested with a simple client-server application and I saw even more 
significant performance boost if padding isn't needed.
    
    Two tests with zero-copy set to true:
    **50 batches, 30 columns (Int32), 199999 rows in each batch**
    - before change: throughput ~25Gbit/s (memory copy happens in 
`grpc-nio-worker-ELG-*`)
    - after change: throughput ~32Gbit/s (20% boost)
    
    **50 batches, 30 columns (Int32), 200k rows in each batch**
    - before change: throughput ~15Gbit/s (much slower than with 199999 because 
memory copy happens in `flight-server-default-executor-*` thread and blocks 
server from writing next batch.
    - after change: throughput ~32Gbit/s (**115% boost**)
    * Closes: #40039
    
    Authored-by: Lev Tolmachev <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 .../main/java/org/apache/arrow/flight/ArrowMessage.java | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
index 46cb282e9f..5b946932f3 100644
--- 
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
+++ 
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
@@ -429,11 +429,26 @@ class ArrowMessage implements AutoCloseable {
       ByteBuf initialBuf = Unpooled.buffer(baos.size());
       initialBuf.writeBytes(baos.toByteArray());
       final CompositeByteBuf bb;
-      final int maxNumComponents = Math.max(2, bufs.size() + 1);
       final ImmutableList<ByteBuf> byteBufs = ImmutableList.<ByteBuf>builder()
           .add(initialBuf)
           .addAll(allBufs)
           .build();
+      // See: https://github.com/apache/arrow/issues/40039
+      // CompositeByteBuf requires us to pass maxNumComponents to constructor.
+      // This number will be used to decide when to stop adding new components 
as separate buffers
+      // and instead merge existing components into a new buffer by performing 
a memory copy.
+      // We want to avoind memory copies as much as possible so we want to set 
the limit that won't be reached.
+      // At a first glance it seems reasonable to set limit to byteBufs.size() 
+ 1,
+      // because it will be enough to avoid merges of byteBufs that we pass to 
constructor.
+      // But later this buffer will be written to socket by Netty
+      // and DefaultHttp2ConnectionEncoder uses CoalescingBufferQueue to 
combine small buffers into one.
+      // Method CoalescingBufferQueue.compose will check if current buffer is 
already a CompositeByteBuf
+      // and if it's the case it will just add a new component to this buffer.
+      // But in out case if we set maxNumComponents=byteBufs.size() + 1 it 
will happen on the first attempt
+      // to write data to socket because header message is small and Netty 
will always try to compine it with the
+      // large CompositeByteBuf we're creating here.
+      // We never want additional memory copies so setting the limit to 
Integer.MAX_VALUE
+      final int maxNumComponents = Integer.MAX_VALUE;
       if (tryZeroCopyWrite) {
         bb = new ArrowBufRetainingCompositeByteBuf(maxNumComponents, byteBufs, 
bufs);
       } else {

Reply via email to