This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ea7da09e530 KAFKA-17899 Add more unit tests for NetworkReceive (#17637)
ea7da09e530 is described below
commit ea7da09e53094b243c6b952b236f731d6cec3225
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Nov 1 02:38:43 2024 +0800
KAFKA-17899 Add more unit tests for NetworkReceive (#17637)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/common/network/NetworkReceiveTest.java | 52 ++++++++++++++++++++++
1 file changed, 52 insertions(+)
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/NetworkReceiveTest.java
b/clients/src/test/java/org/apache/kafka/common/network/NetworkReceiveTest.java
index 80e7e9ce101..c9980b78b55 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/NetworkReceiveTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/NetworkReceiveTest.java
@@ -25,6 +25,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ScatteringByteChannel;
+import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -70,4 +71,55 @@ public class NetworkReceiveTest {
assertTrue(receive.complete());
}
+ @Test
+ public void testRequiredMemoryAmountKnownWhenNotSet() {
+ NetworkReceive receive = new NetworkReceive("0");
+ assertFalse(receive.requiredMemoryAmountKnown(), "Memory amount should
not be known before read.");
+ }
+
+ @Test
+ public void testRequiredMemoryAmountKnownWhenSet() throws IOException {
+ NetworkReceive receive = new NetworkReceive(128, "0");
+
+ ScatteringByteChannel channel =
Mockito.mock(ScatteringByteChannel.class);
+
+ ArgumentCaptor<ByteBuffer> bufferCaptor =
ArgumentCaptor.forClass(ByteBuffer.class);
+
Mockito.when(channel.read(bufferCaptor.capture())).thenAnswer(invocation -> {
+ bufferCaptor.getValue().putInt(64);
+ return 4;
+ });
+
+ receive.readFrom(channel);
+ assertTrue(receive.requiredMemoryAmountKnown(), "Memory amount should
be known after read.");
+ }
+
+ @Test
+ public void testSizeWithPredefineBuffer() {
+ int payloadSize = 8;
+ int expectedTotalSize = 4 + payloadSize; // 4 bytes for size buffer +
payload size
+
+ ByteBuffer payloadBuffer = ByteBuffer.allocate(payloadSize);
+ IntStream.range(0, payloadSize).forEach(i -> payloadBuffer.put((byte)
i));
+
+ NetworkReceive networkReceive = new NetworkReceive("0", payloadBuffer);
+ assertEquals(expectedTotalSize, networkReceive.size(), "The total size
should be the sum of the size buffer and payload.");
+ }
+
+ @Test
+ public void testSizeAfterRead() throws IOException {
+ int payloadSize = 32;
+ int expectedTotalSize = 4 + payloadSize; // 4 bytes for size buffer +
payload size
+ NetworkReceive receive = new NetworkReceive(128, "0");
+
+ ScatteringByteChannel channel =
Mockito.mock(ScatteringByteChannel.class);
+
+ ArgumentCaptor<ByteBuffer> bufferCaptor =
ArgumentCaptor.forClass(ByteBuffer.class);
+
Mockito.when(channel.read(bufferCaptor.capture())).thenAnswer(invocation -> {
+ bufferCaptor.getValue().putInt(payloadSize);
+ return 4;
+ });
+
+ receive.readFrom(channel);
+ assertEquals(expectedTotalSize, receive.size(), "The total size should
be the sum of the size buffer and receive size.");
+ }
}