This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e864d8f168c KAFKA-17661 Fix flaky BufferPoolTest.testBlockTimeout
(#17319)
e864d8f168c is described below
commit e864d8f168c884624c2d0a88ecbe280bd045d864
Author: 陳昱霖(Yu-Lin Chen) <[email protected]>
AuthorDate: Fri Oct 11 22:06:21 2024 +0800
KAFKA-17661 Fix flaky BufferPoolTest.testBlockTimeout (#17319)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../clients/producer/internals/BufferPoolTest.java | 29 +++-------------------
1 file changed, 4 insertions(+), 25 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 7b726434113..128e15ed6c6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -129,14 +129,6 @@ public class BufferPoolTest {
return latch;
}
- private void delayedDeallocate(final BufferPool pool, final ByteBuffer
buffer, final long delayMs) {
- Thread thread = new Thread(() -> {
- Time.SYSTEM.sleep(delayMs);
- pool.deallocate(buffer);
- });
- thread.start();
- }
-
private CountDownLatch asyncAllocate(final BufferPool pool, final int
size) {
final CountDownLatch completed = new CountDownLatch(1);
Thread thread = new Thread(() -> {
@@ -168,26 +160,13 @@ public class BufferPoolTest {
*/
@Test
public void testBlockTimeout() throws Exception {
- BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM,
metricGroup);
- ByteBuffer buffer1 = pool.allocate(1, maxBlockTimeMs);
- ByteBuffer buffer2 = pool.allocate(1, maxBlockTimeMs);
- ByteBuffer buffer3 = pool.allocate(1, maxBlockTimeMs);
- // The first two buffers will be de-allocated within maxBlockTimeMs
since the most recent allocation
- delayedDeallocate(pool, buffer1, maxBlockTimeMs / 2);
- delayedDeallocate(pool, buffer2, maxBlockTimeMs);
- // The third buffer will be de-allocated after maxBlockTimeMs since
the most recent allocation
- delayedDeallocate(pool, buffer3, maxBlockTimeMs / 2 * 5);
+ BufferPool pool = new BufferPool(2, 1, metrics, Time.SYSTEM,
metricGroup);
+ pool.allocate(1, maxBlockTimeMs);
long beginTimeMs = Time.SYSTEM.milliseconds();
- try {
- pool.allocate(10, maxBlockTimeMs);
- fail("The buffer allocated more memory than its maximum value 10");
- } catch (BufferExhaustedException e) {
- // this is good
- }
- // Thread scheduling sometimes means that deallocation varies by this
point
- assertTrue(pool.availableMemory() >= 7 && pool.availableMemory() <=
10, "available memory " + pool.availableMemory());
+ assertThrows(BufferExhaustedException.class, () -> pool.allocate(2,
maxBlockTimeMs));
long durationMs = Time.SYSTEM.milliseconds() - beginTimeMs;
+
assertTrue(durationMs >= maxBlockTimeMs, "BufferExhaustedException
should not throw before maxBlockTimeMs");
assertTrue(durationMs < maxBlockTimeMs + 1000,
"BufferExhaustedException should throw soon after maxBlockTimeMs");
}