This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b5e59cee [CELEBORN-222] Flink plugin RemoteShuffleOutputGate adds ut
about nettybufferTransform (#1162)
b5e59cee is described below
commit b5e59ceefcc9a92921abb365e6ec68f28d599988
Author: zhongqiangczq <[email protected]>
AuthorDate: Fri Jan 13 16:48:11 2023 +0800
[CELEBORN-222] Flink plugin RemoteShuffleOutputGate adds ut about
nettybufferTransform (#1162)
---
.../plugin/flink/RemoteShuffleOutputGate.java | 1 -
.../flink/RemoteShuffleOutputGateSuiteJ.java | 41 ++++++++++++----------
2 files changed, 22 insertions(+), 20 deletions(-)
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
index b0f183fc..381bb05f 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
@@ -212,7 +212,6 @@ public class RemoteShuffleOutputGate {
/** Writes a piece of data to a subpartition. */
public void write(ByteBuf byteBuf, int subIdx) throws InterruptedException {
try {
- byteBuf.retain();
shuffleWriteClient.pushDataToLocation(
applicationId,
shuffleId,
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
index 13cceb34..79399683 100644
---
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
@@ -26,14 +26,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.time.Duration;
import java.util.Optional;
-import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -43,10 +42,15 @@ import
org.apache.celeborn.common.protocol.PartitionLocation;
public class RemoteShuffleOutputGateSuiteJ {
private RemoteShuffleOutputGate remoteShuffleOutputGate =
mock(RemoteShuffleOutputGate.class);
private ShuffleClientImpl shuffleClient = mock(ShuffleClientImpl.class);
+ private static final int BUFFER_SIZE = 20;
+ private NetworkBufferPool networkBufferPool;
+ private BufferPool bufferPool;
@Before
- public void setup() {
+ public void setup() throws IOException {
remoteShuffleOutputGate.shuffleWriteClient = shuffleClient;
+ networkBufferPool = new NetworkBufferPool(10, BUFFER_SIZE);
+ bufferPool = networkBufferPool.createBufferPool(10, 10);
}
@Test
@@ -67,7 +71,7 @@ public class RemoteShuffleOutputGateSuiteJ {
.thenAnswer(t -> Optional.empty());
remoteShuffleOutputGate.regionStart(false);
- remoteShuffleOutputGate.write(createBuffer(), 0);
+ remoteShuffleOutputGate.write(bufferPool.requestBuffer(), 0);
doNothing()
.when(remoteShuffleOutputGate.shuffleWriteClient)
@@ -83,19 +87,18 @@ public class RemoteShuffleOutputGateSuiteJ {
remoteShuffleOutputGate.close();
}
- private Buffer createBuffer() throws IOException, InterruptedException {
- int segmentSize = 32 * 1024;
- NetworkBufferPool networkBufferPool =
- new NetworkBufferPool(256, 32 * 1024, Duration.ofMillis(30000L));
- BufferPool bufferPool =
- networkBufferPool.createBufferPool(
- 8 * 1024 * 1024 / segmentSize, 8 * 1024 * 1024 / segmentSize);
-
- MemorySegment memorySegment = bufferPool.requestMemorySegmentBlocking();
-
- memorySegment.put(0, new byte[] {1, 2, 3});
-
- Buffer buffer = new NetworkBuffer(memorySegment, bufferPool);
- return buffer;
+ @Test
+ public void testNettyPoolTransfrom() {
+ Buffer buffer = bufferPool.requestBuffer();
+ ByteBuf byteBuf = buffer.asByteBuf();
+ byteBuf.writeByte(1);
+ Assert.assertEquals(1, byteBuf.refCnt());
+ io.netty.buffer.ByteBuf celebornByteBuf =
+ io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer());
+ Assert.assertEquals(1, celebornByteBuf.refCnt());
+ celebornByteBuf.release();
+ byteBuf.release();
+ Assert.assertEquals(0, byteBuf.refCnt());
+ Assert.assertEquals(0, celebornByteBuf.refCnt());
}
}