This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 66351e3bc8 GH-40039: [Java][FlightRPC] Improve performance by removing
unnecessary memory copies (#40042)
66351e3bc8 is described below
commit 66351e3bc82995776304e8e150e3f7068d2254e4
Author: Lev Tolmachev <[email protected]>
AuthorDate: Mon Feb 12 14:27:44 2024 +0000
GH-40039: [Java][FlightRPC] Improve performance by removing unnecessary
memory copies (#40042)
### Rationale for this change
Described in details in the issue:
https://github.com/apache/arrow/issues/40039
Summary: class ArrowMessage uses CompositeByteBuf to avoid memory copies
but `maxNumComponents` for it is calculated incorrectly and as a result memory
copies are still performed which significantly affects the performance of the
server.
### What changes are included in this PR?
Changing maxNumComponents to `Integer.MAX_VALUE` because we never want to
silently merge large buffers into one.
User can set useZeroCopy=false (default) and then the library will copy
data into a new buffer before sending it to Netty for write.
### Are these changes tested?
**TestPerf: 30% throughput boost**
```
BEFORE
Transferred 100000000 records totaling 3200000000 bytes at 877.812629
MiB/s. 28764164.218015 record/s. 7024.784185 batch/s.
AFTER
Transferred 100000000 records totaling 3200000000 bytes at 1145.333893
MiB/s. 37530301.022096 record/s. 9165.650116 batch/s.
```
Also tested with a simple client-server application and I saw even more
significant performance boost if padding isn't needed.
Two tests with zero-copy set to true:
**50 batches, 30 columns (Int32), 199999 rows in each batch**
- before change: throughput ~25Gbit/s (memory copy happens in
`grpc-nio-worker-ELG-*`)
- after change: throughput ~32Gbit/s (20% boost)
**50 batches, 30 columns (Int32), 200k rows in each batch**
- before change: throughput ~15Gbit/s (much slower than with 199999 because
memory copy happens in `flight-server-default-executor-*` thread and blocks
server from writing next batch.
- after change: throughput ~32Gbit/s (**115% boost**)
* Closes: #40039
Authored-by: Lev Tolmachev <[email protected]>
Signed-off-by: David Li <[email protected]>
---
.../main/java/org/apache/arrow/flight/ArrowMessage.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
index 46cb282e9f..5b946932f3 100644
---
a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
+++
b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
@@ -429,11 +429,26 @@ class ArrowMessage implements AutoCloseable {
ByteBuf initialBuf = Unpooled.buffer(baos.size());
initialBuf.writeBytes(baos.toByteArray());
final CompositeByteBuf bb;
- final int maxNumComponents = Math.max(2, bufs.size() + 1);
final ImmutableList<ByteBuf> byteBufs = ImmutableList.<ByteBuf>builder()
.add(initialBuf)
.addAll(allBufs)
.build();
+ // See: https://github.com/apache/arrow/issues/40039
+ // CompositeByteBuf requires us to pass maxNumComponents to constructor.
+ // This number will be used to decide when to stop adding new components
as separate buffers
+ // and instead merge existing components into a new buffer by performing
a memory copy.
+ // We want to avoind memory copies as much as possible so we want to set
the limit that won't be reached.
+ // At a first glance it seems reasonable to set limit to byteBufs.size()
+ 1,
+ // because it will be enough to avoid merges of byteBufs that we pass to
constructor.
+ // But later this buffer will be written to socket by Netty
+ // and DefaultHttp2ConnectionEncoder uses CoalescingBufferQueue to
combine small buffers into one.
+ // Method CoalescingBufferQueue.compose will check if current buffer is
already a CompositeByteBuf
+ // and if it's the case it will just add a new component to this buffer.
+ // But in out case if we set maxNumComponents=byteBufs.size() + 1 it
will happen on the first attempt
+ // to write data to socket because header message is small and Netty
will always try to compine it with the
+ // large CompositeByteBuf we're creating here.
+ // We never want additional memory copies so setting the limit to
Integer.MAX_VALUE
+ final int maxNumComponents = Integer.MAX_VALUE;
if (tryZeroCopyWrite) {
bb = new ArrowBufRetainingCompositeByteBuf(maxNumComponents, byteBufs,
bufs);
} else {