This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 810fa1f0c44270dd7d5b3427ff2aad59c9924621 Author: Vinkal <[email protected]> AuthorDate: Tue Nov 4 15:46:28 2025 +0530 [fix][test] Stabilize testMsgDropStat by reliably triggering non-persistent publisher drop (#24929) Signed-off-by: Vinkal Chudgar <[email protected]> (cherry picked from commit 60acfba3aec83f7cb4b6aebb274d203893b4b65b) --- .../pulsar/client/api/NonPersistentTopicTest.java | 93 ++++++++++++++-------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 5ebc28335f9..aa2a6efe117 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -28,17 +28,19 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.net.URL; +import java.time.Duration; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -813,18 +815,22 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { } /** - * Verifies msg-drop stats + * Verifies msg-drop stats. * * @throws Exception */ - @Test + @Test(timeOut = 60000) public void testMsgDropStat() throws Exception { int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection(); try { final String topicName = BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic"); - // restart broker with lower publish rate limit - conf.setMaxConcurrentNonPersistentMessagePerConnection(1); + + // For non-persistent topics, set the per-connection in-flight limit to 0. + // Since ServerCnx drops when inFlight > max; with max=0, any second overlapping send on the + // same connection is dropped (entryId == -1) and recorded. This makes observing a publisher drop + // reliable in this test. + conf.setMaxConcurrentNonPersistentMessagePerConnection(0); stopBroker(); startBroker(); @@ -840,39 +846,58 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { @Cleanup ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + final int threads = 10; @Cleanup("shutdownNow") - ExecutorService executor = Executors.newFixedThreadPool(10); + ExecutorService executor = Executors.newFixedThreadPool(threads); byte[] msgData = "testData".getBytes(); - final int totalProduceMessages = 1000; - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger messagesSent = new AtomicInteger(0); - for (int i = 0; i < totalProduceMessages; i++) { - executor.submit(() -> { - try { - MessageId msgId = producer.send(msgData); - int count = messagesSent.incrementAndGet(); - // process at least 20% of messages before signalling the latch - // a non-persistent message will return entryId as -1 when it has been dropped - // due to setMaxConcurrentNonPersistentMessagePerConnection limit - // also ensure that it has happened before the latch is signalled - if (count > totalProduceMessages * 0.2 && msgId != null - && ((MessageIdImpl) msgId).getEntryId() == -1) { - latch.countDown(); + + /* + * Trigger at least one publisher drop through concurrent send() calls. + * + * Uses CyclicBarrier to ensure all threads send simultaneously, creating overlap. + * With maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx#handleSend + * drops any send while another is in-flight, returning MessageId with entryId = -1. + * Awaitility repeats whole bursts (bounded to 20s) until a drop is observed. + */ + AtomicBoolean publisherDropSeen = new AtomicBoolean(false); + Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> { + CyclicBarrier barrier = new CyclicBarrier(threads); + CountDownLatch completionLatch = new CountDownLatch(threads); + AtomicReference<Throwable> error = new AtomicReference<>(); + publisherDropSeen.set(false); + + for (int i = 0; i < threads; i++) { + executor.submit(() -> { + try { + barrier.await(); + MessageId msgId = producer.send(msgData); + // Publisher drop is signaled by MessageIdImpl.entryId == -1 + if (msgId instanceof MessageIdImpl && ((MessageIdImpl) msgId).getEntryId() == -1) { + publisherDropSeen.set(true); + } + } catch (Throwable t) { + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + error.compareAndSet(null, t); + } finally { + completionLatch.countDown(); } + }); + } - Thread.sleep(10); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - }); - } - assertTrue(latch.await(5, TimeUnit.SECONDS)); + // Wait for all sends to complete. + assertTrue(completionLatch.await(20, TimeUnit.SECONDS)); + + assertNull(error.get(), "Concurrent send encountered an exception"); + return publisherDropSeen.get(); + }); + + assertTrue(publisherDropSeen.get(), "Expected at least one publisher drop (entryId == -1)"); NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
