This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 35b4581fb0a [fix][test] Fix flaky testMsgDropStat in
NonPersistentTopicTest (#25426)
35b4581fb0a is described below
commit 35b4581fb0a735eeb64b652c381742470251c90d
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 30 15:53:41 2026 -0700
[fix][test] Fix flaky testMsgDropStat in NonPersistentTopicTest (#25426)
(cherry picked from commit e9630c60e8bff865e1bfeb2ed26a6c1d65f040b4)
---
.../pulsar/client/api/NonPersistentTopicTest.java | 54 ++++++++++------------
1 file changed, 25 insertions(+), 29 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 284e6a68928..10919bd37c6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -59,7 +59,6 @@ import
org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
@@ -886,30 +885,31 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
ExecutorService executor = Executors.newFixedThreadPool(threads);
byte[] msgData = "testData".getBytes();
+ NonPersistentTopic topic =
+ (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
/*
- * Trigger at least one publisher drop through concurrent send()
calls.
+ * Send concurrent bursts until publisher AND subscription drop
rates are all > 0.
+ *
+ * Each burst uses a CyclicBarrier so all threads send
simultaneously. With
+ * maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx
drops overlapping
+ * sends (publisher drops). Once subscriber queues (size 1) are
full, the dispatcher
+ * also drops delivered messages (subscription drops).
*
- * Uses CyclicBarrier to ensure all threads send simultaneously,
creating overlap.
- * With maxConcurrentNonPersistentMessagePerConnection = 0,
ServerCnx#handleSend
- * drops any send while another is in-flight, returning MessageId
with entryId = -1.
- * Awaitility repeats whole bursts (bounded to 20s) until a drop
is observed.
+ * IMPORTANT: updateRates() calls Rate.calculateRate() which
resets counters via
+ * sumThenReset(). We must keep sending fresh bursts so each
updateRates() call
+ * sees new drops, rather than retrying with stale (reset)
counters.
*/
- AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
- Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+
Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(()
-> {
CyclicBarrier barrier = new CyclicBarrier(threads);
CountDownLatch completionLatch = new CountDownLatch(threads);
AtomicReference<Throwable> error = new AtomicReference<>();
- publisherDropSeen.set(false);
for (int i = 0; i < threads; i++) {
executor.submit(() -> {
try {
barrier.await();
- MessageId msgId = producer.send(msgData);
- // Publisher drop is signaled by
MessageIdImpl.entryId == -1
- if (msgId instanceof MessageIdImpl &&
((MessageIdImpl) msgId).getEntryId() == -1) {
- publisherDropSeen.set(true);
- }
+ producer.send(msgData);
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -921,27 +921,23 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
});
}
- // Wait for all sends to complete.
- assertTrue(completionLatch.await(20, TimeUnit.SECONDS));
-
- assertNull(error.get(), "Concurrent send encountered an
exception");
- return publisherDropSeen.get();
- });
-
- assertTrue(publisherDropSeen.get(), "Expected at least one
publisher drop (entryId == -1)");
-
- NonPersistentTopic topic =
- (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ completionLatch.await(20, TimeUnit.SECONDS);
+ if (error.get() != null) {
+ return false;
+ }
- Awaitility.await().ignoreExceptions().untilAsserted(() -> {
pulsar.getBrokerService().updateRates();
NonPersistentTopicStats stats = topic.getStats(false, false,
false);
+ if (stats.getPublishers().isEmpty()) {
+ return false;
+ }
NonPersistentPublisherStats npStats =
stats.getPublishers().get(0);
NonPersistentSubscriptionStats sub1Stats =
stats.getSubscriptions().get("subscriber-1");
NonPersistentSubscriptionStats sub2Stats =
stats.getSubscriptions().get("subscriber-2");
- assertTrue(npStats.getMsgDropRate() > 0);
- assertTrue(sub1Stats.getMsgDropRate() > 0);
- assertTrue(sub2Stats.getMsgDropRate() > 0);
+ return sub1Stats != null && sub2Stats != null
+ && npStats.getMsgDropRate() > 0
+ && sub1Stats.getMsgDropRate() > 0
+ && sub2Stats.getMsgDropRate() > 0;
});
} finally {