This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 190df07ace9 KAFKA-17265 Fix flaky
MemoryRecordsBuilderTest#testBuffersDereferencedOnClose (#17092)
190df07ace9 is described below
commit 190df07ace9ab266f8d27cc353481e6aa08efd77
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Thu Sep 5 19:47:16 2024 +0800
KAFKA-17265 Fix flaky
MemoryRecordsBuilderTest#testBuffersDereferencedOnClose (#17092)
Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../common/record/MemoryRecordsBuilderTest.java | 42 ++++++----------------
1 file changed, 11 insertions(+), 31 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index a87aa3878fc..e6549d07227 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -41,7 +41,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
-import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -734,36 +733,17 @@ public class MemoryRecordsBuilderTest {
@ParameterizedTest
@ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
- public void testBuffersDereferencedOnClose(Args args) {
- Runtime runtime = Runtime.getRuntime();
- int payloadLen = 1024 * 1024;
- ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
- byte[] key = new byte[0];
- byte[] value = new byte[payloadLen];
- new Random().nextBytes(value); // Use random payload so that
compressed buffer is large
- List<MemoryRecordsBuilder> builders = new ArrayList<>(100);
- long startMem = 0;
- long memUsed = 0;
- int iterations = 0;
- while (iterations++ < 100) {
- buffer.rewind();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer,
args.magic, args.compression,
- TimestampType.CREATE_TIME, 0L, 0L,
RecordBatch.NO_PRODUCER_ID,
- RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false,
- RecordBatch.NO_PARTITION_LEADER_EPOCH, 0);
- builder.append(1L, key, value);
- builder.build();
- builders.add(builder);
-
- System.gc();
- memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem;
- // Ignore memory usage during initialization
- if (iterations == 2)
- startMem = memUsed;
- else if (iterations > 2 && memUsed < (iterations - 2) * 1024L)
- break;
- }
- assertTrue(iterations < 100, "Memory usage too high: " + memUsed);
+ public void shouldThrowIllegalStateExceptionOnAppendWhenClosed(Args args) {
+ ByteBuffer buffer = allocateBuffer(128, args);
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer,
args.magic, args.compression,
+ TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ builder.append(0L, "a".getBytes(), "1".getBytes());
+ builder.build();
+
+ assertEquals("Tried to append a record, but MemoryRecordsBuilder is
closed for record appends",
+ assertThrows(IllegalStateException.class, () ->
builder.append(0L, "a".getBytes(), "1".getBytes())).getMessage());
}
@ParameterizedTest