This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 60acfba3aec [fix][test] Stabilize testMsgDropStat by reliably
triggering non-persistent publisher drop (#24929)
60acfba3aec is described below
commit 60acfba3aec83f7cb4b6aebb274d203893b4b65b
Author: Vinkal <[email protected]>
AuthorDate: Tue Nov 4 15:46:28 2025 +0530
[fix][test] Stabilize testMsgDropStat by reliably triggering non-persistent
publisher drop (#24929)
Signed-off-by: Vinkal Chudgar <[email protected]>
---
.../pulsar/client/api/NonPersistentTopicTest.java | 85 ++++++++++++++--------
1 file changed, 55 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index c90aaa23498..aa34b619d11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -31,17 +31,19 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import io.opentelemetry.api.common.Attributes;
import java.net.URL;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
@@ -847,14 +849,18 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
*
* @throws Exception
*/
- @Test
+ @Test(timeOut = 60000)
public void testMsgDropStat() throws Exception {
int defaultNonPersistentMessageRate =
conf.getMaxConcurrentNonPersistentMessagePerConnection();
try {
final String topicName =
BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic");
- // restart broker with lower publish rate limit
- conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
+
+ // For non-persistent topics, set the per-connection in-flight
limit to 0.
+ // Since ServerCnx drops when inFlight > max; with max=0, any
second overlapping send on the
+ // same connection is dropped (entryId == -1) and recorded. This
makes observing a publisher drop
+ // reliable in this test.
+ conf.setMaxConcurrentNonPersistentMessagePerConnection(0);
stopBroker();
startBroker();
@@ -873,36 +879,55 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
+
+ final int threads = 10;
@Cleanup("shutdownNow")
- ExecutorService executor = Executors.newFixedThreadPool(10);
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
byte[] msgData = "testData".getBytes();
- final int totalProduceMessages = 1000;
- CountDownLatch latch = new CountDownLatch(1);
- AtomicInteger messagesSent = new AtomicInteger(0);
- for (int i = 0; i < totalProduceMessages; i++) {
- executor.submit(() -> {
- try {
- MessageId msgId = producer.send(msgData);
- int count = messagesSent.incrementAndGet();
- // process at least 20% of messages before signalling
the latch
- // a non-persistent message will return entryId as -1
when it has been dropped
- // due to
setMaxConcurrentNonPersistentMessagePerConnection limit
- // also ensure that it has happened before the latch
is signalled
- if (count > totalProduceMessages * 0.2 && msgId != null
- && ((MessageIdImpl) msgId).getEntryId() == -1)
{
- latch.countDown();
+
+ /*
+ * Trigger at least one publisher drop through concurrent send()
calls.
+ *
+ * Uses CyclicBarrier to ensure all threads send simultaneously,
creating overlap.
+ * With maxConcurrentNonPersistentMessagePerConnection = 0,
ServerCnx#handleSend
+ * drops any send while another is in-flight, returning MessageId
with entryId = -1.
+ * Awaitility repeats whole bursts (bounded to 20s) until a drop
is observed.
+ */
+ AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+ CyclicBarrier barrier = new CyclicBarrier(threads);
+ CountDownLatch completionLatch = new CountDownLatch(threads);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ publisherDropSeen.set(false);
+
+ for (int i = 0; i < threads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ MessageId msgId = producer.send(msgData);
+ // Publisher drop is signaled by
MessageIdImpl.entryId == -1
+ if (msgId instanceof MessageIdImpl &&
((MessageIdImpl) msgId).getEntryId() == -1) {
+ publisherDropSeen.set(true);
+ }
+ } catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ error.compareAndSet(null, t);
+ } finally {
+ completionLatch.countDown();
}
+ });
+ }
- Thread.sleep(10);
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- });
- }
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ // Wait for all sends to complete.
+ assertTrue(completionLatch.await(20, TimeUnit.SECONDS));
+
+ assertNull(error.get(), "Concurrent send encountered an
exception");
+ return publisherDropSeen.get();
+ });
+
+ assertTrue(publisherDropSeen.get(), "Expected at least one
publisher drop (entryId == -1)");
NonPersistentTopic topic =
(NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();