This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1dfe07eb9b3 [fix][broker] Fix stack overflow caused by race condition
when closing a connection (#24934)
1dfe07eb9b3 is described below
commit 1dfe07eb9b387c2b815b39096f370c639f6dcde5
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 3 23:46:02 2025 +0800
[fix][broker] Fix stack overflow caused by race condition when closing a
connection (#24934)
---
.../AbstractDispatcherSingleActiveConsumer.java | 29 +++++++-
...rsistentDispatcherSingleActiveConsumerTest.java | 81 ++++++++++++++++++++++
2 files changed, 108 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index baca6bf078c..792b75f2896 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractDispatcherSingleActiveConsumer extends
AbstractBaseDispatcher {
+ private static final int MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE = 5;
protected final String topicName;
private volatile Consumer activeConsumer = null;
protected final CopyOnWriteArrayList<Consumer> consumers;
@@ -161,7 +163,11 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
return Collections.unmodifiableNavigableMap(hashRing);
}
- public synchronized CompletableFuture<Void> addConsumer(Consumer consumer)
{
+ public CompletableFuture<Void> addConsumer(Consumer consumer) {
+ return internalAddConsumer(consumer, 0);
+ }
+
+ private synchronized CompletableFuture<Void> internalAddConsumer(Consumer
consumer, int retryCount) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
this.topicName, consumer);
consumer.disconnect();
@@ -171,12 +177,31 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
Consumer actConsumer = getActiveConsumer();
if (actConsumer != null) {
+ final var callerThread = Thread.currentThread();
return
actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive
-> {
if (actConsumerStillAlive.isEmpty() ||
actConsumerStillAlive.get()) {
return FutureUtil.failedFuture(new
ConsumerBusyException("Exclusive consumer is already"
+ " connected"));
+ } else if (retryCount >=
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE) {
+ log.warn("[{}] The active consumer's connection is
still inactive after all retries {}, skip "
+ + "adding new consumer {}", getName(),
actConsumer, consumer);
+ return FutureUtil.failedFuture(new
ConsumerBusyException("Exclusive consumer is already"
+ + " connected after " +
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE + " attempts"));
} else {
- return addConsumer(consumer);
+ if (Thread.currentThread().equals(callerThread)) {
+ // A race condition happened in
`ServerCnx#channelInactive`
+ // 1. `isActive` was set to false
+ // 2. `consumer.close()` is called
+ // We should wait until the consumer is closed,
retry for some times
+ log.warn("[{}] race condition happened that cnx of
the active consumer ({}) is inactive "
+ + "but it's not removed, retrying",
getName(), actConsumer);
+ final var future = new CompletableFuture<Void>();
+ CompletableFuture.delayedExecutor(100,
TimeUnit.MILLISECONDS)
+ .execute(() -> future.complete(null));
+ return future.thenCompose(__ ->
internalAddConsumer(consumer, retryCount + 1));
+ } else {
+ return internalAddConsumer(consumer, retryCount +
1);
+ }
}
});
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
index dc6d451ed0f..466f55436ea 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
@@ -18,7 +18,13 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -26,22 +32,29 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
public class PersistentDispatcherSingleActiveConsumerTest extends
ProducerConsumerBase {
+
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
@@ -129,4 +142,72 @@ public class PersistentDispatcherSingleActiveConsumerTest
extends ProducerConsum
// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}
+
+ @DataProvider
+ public static Object[][] closeDelayMs() {
+ return new Object[][] { { 500 }, { 2000 } };
+ }
+
+ @Test(dataProvider = "closeDelayMs")
+ public void testOverrideInactiveConsumer(long closeDelayMs) throws
Exception {
+ final var interceptor = new Interceptor();
+ pulsar.getBrokerService().setInterceptor(interceptor);
+ final var topic = "test-override-inactive-consumer-" + closeDelayMs;
+ @Cleanup final var client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ @Cleanup final var consumer =
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
+ final var dispatcher = ((PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(TopicName.get(topic)
+
.toString()).get().orElseThrow()).getSubscription("sub").dispatcher;
+ Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+
+ // Generally `isActive` could only be false after `channelInactive` is
called, setting it with false directly
+ // to avoid race condition.
+ final var latch = new CountDownLatch(1);
+ interceptor.latch.set(latch);
+ interceptor.injectCloseLatency.set(true);
+ interceptor.delayMs = closeDelayMs;
+ // Simulate the real case because `channelInactive` is always called
in the event loop thread
+ final var cnx = (ServerCnx) dispatcher.getConsumers().get(0).cnx();
+ cnx.ctx().executor().execute(() -> {
+ try {
+ cnx.channelInactive(cnx.ctx());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ @Cleanup final var mockConsumer = Mockito.mock(Consumer.class);
+ Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
+ if (closeDelayMs < 1000) {
+ dispatcher.addConsumer(mockConsumer).get();
+ Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+ Assert.assertSame(mockConsumer, dispatcher.getConsumers().get(0));
+ } else {
+ try {
+ dispatcher.addConsumer(mockConsumer).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof
BrokerServiceException.ConsumerBusyException);
+ }
+ }
+ }
+
+ private static class Interceptor extends MockBrokerInterceptor {
+
+ final AtomicBoolean injectCloseLatency = new AtomicBoolean(false);
+ final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
+ long delayMs = 500;
+
+ @Override
+ public void onConnectionClosed(ServerCnx cnx) {
+ if (injectCloseLatency.compareAndSet(true, false)) {
+
Optional.ofNullable(latch.get()).ifPresent(CountDownLatch::countDown);
+ latch.set(null);
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
}