This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e864d8f168c KAFKA-17661 Fix flaky BufferPoolTest.testBlockTimeout 
(#17319)
e864d8f168c is described below

commit e864d8f168c884624c2d0a88ecbe280bd045d864
Author: 陳昱霖(Yu-Lin Chen) <[email protected]>
AuthorDate: Fri Oct 11 22:06:21 2024 +0800

    KAFKA-17661 Fix flaky BufferPoolTest.testBlockTimeout (#17319)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../clients/producer/internals/BufferPoolTest.java | 29 +++-------------------
 1 file changed, 4 insertions(+), 25 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 7b726434113..128e15ed6c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -129,14 +129,6 @@ public class BufferPoolTest {
         return latch;
     }
 
-    private void delayedDeallocate(final BufferPool pool, final ByteBuffer 
buffer, final long delayMs) {
-        Thread thread = new Thread(() -> {
-            Time.SYSTEM.sleep(delayMs);
-            pool.deallocate(buffer);
-        });
-        thread.start();
-    }
-
     private CountDownLatch asyncAllocate(final BufferPool pool, final int 
size) {
         final CountDownLatch completed = new CountDownLatch(1);
         Thread thread = new Thread(() -> {
@@ -168,26 +160,13 @@ public class BufferPoolTest {
      */
     @Test
     public void testBlockTimeout() throws Exception {
-        BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, 
metricGroup);
-        ByteBuffer buffer1 = pool.allocate(1, maxBlockTimeMs);
-        ByteBuffer buffer2 = pool.allocate(1, maxBlockTimeMs);
-        ByteBuffer buffer3 = pool.allocate(1, maxBlockTimeMs);
-        // The first two buffers will be de-allocated within maxBlockTimeMs 
since the most recent allocation
-        delayedDeallocate(pool, buffer1, maxBlockTimeMs / 2);
-        delayedDeallocate(pool, buffer2, maxBlockTimeMs);
-        // The third buffer will be de-allocated after maxBlockTimeMs since 
the most recent allocation
-        delayedDeallocate(pool, buffer3, maxBlockTimeMs / 2 * 5);
+        BufferPool pool = new BufferPool(2, 1, metrics, Time.SYSTEM, 
metricGroup);
+        pool.allocate(1, maxBlockTimeMs);
 
         long beginTimeMs = Time.SYSTEM.milliseconds();
-        try {
-            pool.allocate(10, maxBlockTimeMs);
-            fail("The buffer allocated more memory than its maximum value 10");
-        } catch (BufferExhaustedException e) {
-            // this is good
-        }
-        // Thread scheduling sometimes means that deallocation varies by this 
point
-        assertTrue(pool.availableMemory() >= 7 && pool.availableMemory() <= 
10, "available memory " + pool.availableMemory());
+        assertThrows(BufferExhaustedException.class, () -> pool.allocate(2, 
maxBlockTimeMs));
         long durationMs = Time.SYSTEM.milliseconds() - beginTimeMs;
+
         assertTrue(durationMs >= maxBlockTimeMs, "BufferExhaustedException 
should not throw before maxBlockTimeMs");
         assertTrue(durationMs < maxBlockTimeMs + 1000, 
"BufferExhaustedException should throw soon after maxBlockTimeMs");
     }

Reply via email to