This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dfe07eb9b3 [fix][broker] Fix stack overflow caused by race condition 
when closing a connection (#24934)
1dfe07eb9b3 is described below

commit 1dfe07eb9b387c2b815b39096f370c639f6dcde5
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 3 23:46:02 2025 +0800

    [fix][broker] Fix stack overflow caused by race condition when closing a 
connection (#24934)
---
 .../AbstractDispatcherSingleActiveConsumer.java    | 29 +++++++-
 ...rsistentDispatcherSingleActiveConsumerTest.java | 81 ++++++++++++++++++++++
 2 files changed, 108 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index baca6bf078c..792b75f2896 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 public abstract class AbstractDispatcherSingleActiveConsumer extends 
AbstractBaseDispatcher {
 
+    private static final int MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE = 5;
     protected final String topicName;
     private volatile Consumer activeConsumer = null;
     protected final CopyOnWriteArrayList<Consumer> consumers;
@@ -161,7 +163,11 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         return Collections.unmodifiableNavigableMap(hashRing);
     }
 
-    public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) 
{
+    public CompletableFuture<Void> addConsumer(Consumer consumer) {
+        return internalAddConsumer(consumer, 0);
+    }
+
+    private synchronized CompletableFuture<Void> internalAddConsumer(Consumer 
consumer, int retryCount) {
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
this.topicName, consumer);
             consumer.disconnect();
@@ -171,12 +177,31 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
             Consumer actConsumer = getActiveConsumer();
             if (actConsumer != null) {
+                final var callerThread = Thread.currentThread();
                 return 
actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive 
-> {
                     if (actConsumerStillAlive.isEmpty() || 
actConsumerStillAlive.get()) {
                         return FutureUtil.failedFuture(new 
ConsumerBusyException("Exclusive consumer is already"
                                 + " connected"));
+                    } else if (retryCount >= 
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE) {
+                        log.warn("[{}] The active consumer's connection is 
still inactive after all retries {}, skip "
+                                        + "adding new consumer {}", getName(), 
actConsumer, consumer);
+                        return FutureUtil.failedFuture(new 
ConsumerBusyException("Exclusive consumer is already"
+                                + " connected after " + 
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE + " attempts"));
                     } else {
-                        return addConsumer(consumer);
+                        if (Thread.currentThread().equals(callerThread)) {
+                            // A race condition happened in 
`ServerCnx#channelInactive`
+                            // 1. `isActive` was set to false
+                            // 2. `consumer.close()` is called
+                            // We should wait until the consumer is closed, 
retry for some times
+                            log.warn("[{}] race condition happened that cnx of 
the active consumer ({}) is inactive "
+                                    + "but it's not removed, retrying", 
getName(), actConsumer);
+                            final var future = new CompletableFuture<Void>();
+                            CompletableFuture.delayedExecutor(100, 
TimeUnit.MILLISECONDS)
+                                    .execute(() -> future.complete(null));
+                            return future.thenCompose(__ -> 
internalAddConsumer(consumer, retryCount + 1));
+                        } else {
+                            return internalAddConsumer(consumer, retryCount + 
1);
+                        }
                     }
                 });
             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
index dc6d451ed0f..466f55436ea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
@@ -18,7 +18,13 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -26,22 +32,29 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-api")
 public class PersistentDispatcherSingleActiveConsumerTest extends 
ProducerConsumerBase {
+
     @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
@@ -129,4 +142,72 @@ public class PersistentDispatcherSingleActiveConsumerTest 
extends ProducerConsum
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @DataProvider
+    public static Object[][] closeDelayMs() {
+        return new Object[][] { { 500 }, { 2000 } };
+    }
+
+    @Test(dataProvider = "closeDelayMs")
+    public void testOverrideInactiveConsumer(long closeDelayMs) throws 
Exception {
+        final var interceptor = new Interceptor();
+        pulsar.getBrokerService().setInterceptor(interceptor);
+        final var topic = "test-override-inactive-consumer-" + closeDelayMs;
+        @Cleanup final var client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        @Cleanup final var consumer = 
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
+        final var dispatcher = ((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(TopicName.get(topic)
+                
.toString()).get().orElseThrow()).getSubscription("sub").dispatcher;
+        Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+
+        // Generally `isActive` could only be false after `channelInactive` is 
called, setting it with false directly
+        // to avoid race condition.
+        final var latch = new CountDownLatch(1);
+        interceptor.latch.set(latch);
+        interceptor.injectCloseLatency.set(true);
+        interceptor.delayMs = closeDelayMs;
+        // Simulate the real case because `channelInactive` is always called 
in the event loop thread
+        final var cnx = (ServerCnx) dispatcher.getConsumers().get(0).cnx();
+        cnx.ctx().executor().execute(() -> {
+            try {
+                cnx.channelInactive(cnx.ctx());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        @Cleanup final var mockConsumer = Mockito.mock(Consumer.class);
+        Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
+        if (closeDelayMs < 1000) {
+            dispatcher.addConsumer(mockConsumer).get();
+            Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+            Assert.assertSame(mockConsumer, dispatcher.getConsumers().get(0));
+        } else {
+            try {
+                dispatcher.addConsumer(mockConsumer).get();
+                Assert.fail();
+            } catch (ExecutionException e) {
+                Assert.assertTrue(e.getCause() instanceof 
BrokerServiceException.ConsumerBusyException);
+            }
+        }
+    }
+
+    private static class Interceptor extends MockBrokerInterceptor {
+
+        final AtomicBoolean injectCloseLatency = new AtomicBoolean(false);
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
+        long delayMs = 500;
+
+        @Override
+        public void onConnectionClosed(ServerCnx cnx) {
+            if (injectCloseLatency.compareAndSet(true, false)) {
+                
Optional.ofNullable(latch.get()).ifPresent(CountDownLatch::countDown);
+                latch.set(null);
+                try {
+                    Thread.sleep(delayMs);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
 }

Reply via email to