This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ec894557208f3be462993fad1e58268194eec8bc Author: Ruimin MA <[email protected]> AuthorDate: Tue Oct 28 09:17:19 2025 +0800 [fix][test]fix flaky SimpleProducerConsumerTest.testReceiveAsyncCompletedWhenClosing (#24858) (cherry picked from commit 1ca17972459095278e2b5f7ed7fd55c8921d8826) --- .../client/api/SimpleProducerConsumerTest.java | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index d9d2c4cec81..8bfded83343 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4082,14 +4082,18 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // 1) Test receiveAsync is interrupted CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(() -> { + CountDownLatch subCountDownLatch = new CountDownLatch(1); try { new Thread(() -> { try { + subCountDownLatch.await(); consumer.close(); - } catch (PulsarClientException ignore) { + } catch (PulsarClientException | InterruptedException ignore) { } }).start(); - consumer.receiveAsync().get(); + CompletableFuture<Message<String>> futhre = consumer.receiveAsync(); + subCountDownLatch.countDown(); + futhre.get(); Assert.fail("should be interrupted"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains(errorMsg)); @@ -4106,13 +4110,17 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .batchReceivePolicy(batchReceivePolicy).subscribe(); new Thread(() -> { try { + CountDownLatch subCountDownLatch = new CountDownLatch(1); new Thread(() -> { try { + subCountDownLatch.await(); consumer2.close(); - } catch (PulsarClientException ignore) { + } catch (PulsarClientException | InterruptedException ignore) { } }).start(); - consumer2.batchReceiveAsync().get(); + CompletableFuture<Messages<String>> future = consumer2.batchReceiveAsync(); + subCountDownLatch.countDown(); + future.get(); Assert.fail("should be interrupted"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains(errorMsg)); @@ -4129,13 +4137,18 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .batchReceivePolicy(batchReceivePolicy).subscribe(); new Thread(() -> { try { + CountDownLatch subCountDownLatch = new CountDownLatch(1); new Thread(() -> { try { + subCountDownLatch.await(); partitionedTopicConsumer.close(); - } catch (PulsarClientException ignore) { + } catch (PulsarClientException | InterruptedException ignore) { } }).start(); - partitionedTopicConsumer.batchReceiveAsync().get(); + CompletableFuture<Messages<String>> future = + partitionedTopicConsumer.batchReceiveAsync(); + subCountDownLatch.countDown(); + future.get(); Assert.fail("should be interrupted"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains(errorMsg));
