This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7a36d4d736 [ISSUE #7757] Use `CompositeByteBuf` to prevent memory
copy. (#7694)
7a36d4d736 is described below
commit 7a36d4d736ae8d6d92658e3bdb18f1cd5c0afdb0
Author: 道君 <[email protected]>
AuthorDate: Wed Jan 17 09:49:23 2024 +0800
[ISSUE #7757] Use `CompositeByteBuf` to prevent memory copy. (#7694)
* Use CompositeByteBuf to prevent mem_copy.
* Fix code
* Add tests
* Remove useless UTs
* Remove unused imports.
---------
Co-authored-by: RongtongJin <[email protected]>
---
.../rocketmq/remoting/netty/FileRegionEncoder.java | 20 ++++++++++++++++----
.../remoting/netty/FileRegionEncoderTest.java | 5 +++--
2 files changed, 19 insertions(+), 6 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
index 7373a56070..3522c7965c 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -18,6 +18,9 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToByteEncoder;
@@ -51,9 +54,12 @@ public class FileRegionEncoder extends
MessageToByteEncoder<FileRegion> {
WritableByteChannel writableByteChannel = new WritableByteChannel() {
@Override
public int write(ByteBuffer src) {
- int prev = out.writerIndex();
- out.writeBytes(src);
- return out.writerIndex() - prev;
+ // To prevent mem_copy.
+ CompositeByteBuf b = (CompositeByteBuf) out;
+ // Have to increase writerIndex manually.
+ ByteBuf unpooled = Unpooled.wrappedBuffer(src);
+ b.addComponent(true, unpooled);
+ return unpooled.readableBytes();
}
@Override
@@ -76,4 +82,10 @@ public class FileRegionEncoder extends
MessageToByteEncoder<FileRegion> {
msg.transferTo(writableByteChannel, transferred);
}
}
-}
+
+ @Override
+ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, FileRegion
msg, boolean preferDirect) throws Exception {
+ ByteBufAllocator allocator = ctx.alloc();
+ return preferDirect ? allocator.compositeDirectBuffer() :
allocator.compositeHeapBuffer();
+ }
+}
\ No newline at end of file
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
index 6c7327f258..0cbe627d80 100644
---
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
@@ -21,14 +21,15 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Assert;
+import org.junit.Test;
+
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
-import org.junit.Assert;
-import org.junit.Test;
public class FileRegionEncoderTest {