This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d66e5eeab48 [fix][test] Stabilize testMsgDropStat by reliably
triggering non-persistent publisher drop (#24929)
d66e5eeab48 is described below
commit d66e5eeab48c9b8a534ec70502e3f23fdb9358c0
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 4 15:46:28 2025 +0530
[fix][test] Stabilize testMsgDropStat by reliably triggering non-persistent
publisher drop (#24929)
Signed-off-by: Vinkal Chudgar <[email protected]>
(cherry picked from commit 60acfba3aec83f7cb4b6aebb274d203893b4b65b)
---
.../pulsar/client/api/NonPersistentTopicTest.java | 97 ++++++++++++++++------
1 file changed, 73 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 85274064964..fd18949a2e6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -28,17 +28,21 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.net.URL;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -50,6 +54,7 @@ import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
@@ -66,6 +71,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -813,14 +819,18 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
*
* @throws Exception
*/
- @Test
+ @Test(timeOut = 60000)
public void testMsgDropStat() throws Exception {
int defaultNonPersistentMessageRate =
conf.getMaxConcurrentNonPersistentMessagePerConnection();
try {
- final String topicName =
"non-persistent://my-property/my-ns/stats-topic";
- // restart broker with lower publish rate limit
- conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
+ final String topicName =
BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic");
+
+ // For non-persistent topics, set the per-connection in-flight
limit to 0.
+ // Since ServerCnx drops when inFlight > max; with max=0, any
second overlapping send on the
+ // same connection is dropped (entryId == -1) and recorded. This
makes observing a publisher drop
+ // reliable in this test.
+ conf.setMaxConcurrentNonPersistentMessagePerConnection(0);
stopBroker();
startBroker();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
@@ -833,30 +843,69 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
+
+ final int threads = 10;
@Cleanup("shutdownNow")
- ExecutorService executor = Executors.newFixedThreadPool(5);
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
byte[] msgData = "testData".getBytes();
- final int totalProduceMessages = 200;
- CountDownLatch latch = new CountDownLatch(totalProduceMessages);
- for (int i = 0; i < totalProduceMessages; i++) {
- executor.submit(() -> {
- producer.sendAsync(msgData).handle((msg, e) -> {
- latch.countDown();
- return null;
+
+ /*
+ * Trigger at least one publisher drop through concurrent send()
calls.
+ *
+ * Uses CyclicBarrier to ensure all threads send simultaneously,
creating overlap.
+ * With maxConcurrentNonPersistentMessagePerConnection = 0,
ServerCnx#handleSend
+ * drops any send while another is in-flight, returning MessageId
with entryId = -1.
+ * Awaitility repeats whole bursts (bounded to 20s) until a drop
is observed.
+ */
+ AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+ CyclicBarrier barrier = new CyclicBarrier(threads);
+ CountDownLatch completionLatch = new CountDownLatch(threads);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ publisherDropSeen.set(false);
+
+ for (int i = 0; i < threads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ MessageId msgId = producer.send(msgData);
+ // Publisher drop is signaled by
MessageIdImpl.entryId == -1
+ if (msgId instanceof MessageIdImpl &&
((MessageIdImpl) msgId).getEntryId() == -1) {
+ publisherDropSeen.set(true);
+ }
+ } catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ error.compareAndSet(null, t);
+ } finally {
+ completionLatch.countDown();
+ }
});
- });
- }
- latch.await();
+ }
+
+ // Wait for all sends to complete.
+ assertTrue(completionLatch.await(20, TimeUnit.SECONDS));
+
+ assertNull(error.get(), "Concurrent send encountered an
exception");
+ return publisherDropSeen.get();
+ });
+
+ assertTrue(publisherDropSeen.get(), "Expected at least one
publisher drop (entryId == -1)");
+
+ NonPersistentTopic topic =
+ (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
- NonPersistentTopic topic = (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
- pulsar.getBrokerService().updateRates();
- NonPersistentTopicStats stats = topic.getStats(false, false,
false);
- NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
- NonPersistentSubscriptionStats sub1Stats =
stats.getSubscriptions().get("subscriber-1");
- NonPersistentSubscriptionStats sub2Stats =
stats.getSubscriptions().get("subscriber-2");
- assertTrue(npStats.getMsgDropRate() > 0);
- assertTrue(sub1Stats.getMsgDropRate() > 0);
- assertTrue(sub2Stats.getMsgDropRate() > 0);
+ Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+ pulsar.getBrokerService().updateRates();
+ NonPersistentTopicStats stats = topic.getStats(false, false,
false);
+ NonPersistentPublisherStats npStats =
stats.getPublishers().get(0);
+ NonPersistentSubscriptionStats sub1Stats =
stats.getSubscriptions().get("subscriber-1");
+ NonPersistentSubscriptionStats sub2Stats =
stats.getSubscriptions().get("subscriber-2");
+ assertTrue(npStats.getMsgDropRate() > 0);
+ assertTrue(sub1Stats.getMsgDropRate() > 0);
+ assertTrue(sub2Stats.getMsgDropRate() > 0);
+ });
producer.close();
consumer.close();