This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b5e59cee [CELEBORN-222] Flink plugin RemoteShuffleOutputGate adds ut 
about nettybufferTransform (#1162)
b5e59cee is described below

commit b5e59ceefcc9a92921abb365e6ec68f28d599988
Author: zhongqiangczq <[email protected]>
AuthorDate: Fri Jan 13 16:48:11 2023 +0800

    [CELEBORN-222] Flink plugin RemoteShuffleOutputGate adds ut about 
nettybufferTransform (#1162)
---
 .../plugin/flink/RemoteShuffleOutputGate.java      |  1 -
 .../flink/RemoteShuffleOutputGateSuiteJ.java       | 41 ++++++++++++----------
 2 files changed, 22 insertions(+), 20 deletions(-)

diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
index b0f183fc..381bb05f 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
@@ -212,7 +212,6 @@ public class RemoteShuffleOutputGate {
   /** Writes a piece of data to a subpartition. */
   public void write(ByteBuf byteBuf, int subIdx) throws InterruptedException {
     try {
-      byteBuf.retain();
       shuffleWriteClient.pushDataToLocation(
           applicationId,
           shuffleId,
diff --git 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
index 13cceb34..79399683 100644
--- 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
+++ 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
@@ -26,14 +26,13 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Optional;
 
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,10 +42,15 @@ import 
org.apache.celeborn.common.protocol.PartitionLocation;
 public class RemoteShuffleOutputGateSuiteJ {
   private RemoteShuffleOutputGate remoteShuffleOutputGate = 
mock(RemoteShuffleOutputGate.class);
   private ShuffleClientImpl shuffleClient = mock(ShuffleClientImpl.class);
+  private static final int BUFFER_SIZE = 20;
+  private NetworkBufferPool networkBufferPool;
+  private BufferPool bufferPool;
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     remoteShuffleOutputGate.shuffleWriteClient = shuffleClient;
+    networkBufferPool = new NetworkBufferPool(10, BUFFER_SIZE);
+    bufferPool = networkBufferPool.createBufferPool(10, 10);
   }
 
   @Test
@@ -67,7 +71,7 @@ public class RemoteShuffleOutputGateSuiteJ {
         .thenAnswer(t -> Optional.empty());
     remoteShuffleOutputGate.regionStart(false);
 
-    remoteShuffleOutputGate.write(createBuffer(), 0);
+    remoteShuffleOutputGate.write(bufferPool.requestBuffer(), 0);
 
     doNothing()
         .when(remoteShuffleOutputGate.shuffleWriteClient)
@@ -83,19 +87,18 @@ public class RemoteShuffleOutputGateSuiteJ {
     remoteShuffleOutputGate.close();
   }
 
-  private Buffer createBuffer() throws IOException, InterruptedException {
-    int segmentSize = 32 * 1024;
-    NetworkBufferPool networkBufferPool =
-        new NetworkBufferPool(256, 32 * 1024, Duration.ofMillis(30000L));
-    BufferPool bufferPool =
-        networkBufferPool.createBufferPool(
-            8 * 1024 * 1024 / segmentSize, 8 * 1024 * 1024 / segmentSize);
-
-    MemorySegment memorySegment = bufferPool.requestMemorySegmentBlocking();
-
-    memorySegment.put(0, new byte[] {1, 2, 3});
-
-    Buffer buffer = new NetworkBuffer(memorySegment, bufferPool);
-    return buffer;
+  @Test
+  public void testNettyPoolTransfrom() {
+    Buffer buffer = bufferPool.requestBuffer();
+    ByteBuf byteBuf = buffer.asByteBuf();
+    byteBuf.writeByte(1);
+    Assert.assertEquals(1, byteBuf.refCnt());
+    io.netty.buffer.ByteBuf celebornByteBuf =
+        io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer());
+    Assert.assertEquals(1, celebornByteBuf.refCnt());
+    celebornByteBuf.release();
+    byteBuf.release();
+    Assert.assertEquals(0, byteBuf.refCnt());
+    Assert.assertEquals(0, celebornByteBuf.refCnt());
   }
 }

Reply via email to