This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 644c6e43813 [fix][broker] Return if
AbstractDispatcherSingleActiveConsumer closed (#19934)
644c6e43813 is described below
commit 644c6e43813dae37b30c56f48d15705123fe6110
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Apr 6 21:50:45 2023 +0800
[fix][broker] Return if AbstractDispatcherSingleActiveConsumer closed
(#19934)
---
.../AbstractDispatcherSingleActiveConsumer.java | 1 +
.../NonPersistentDispatcherMultipleConsumers.java | 2 +-
...PersistentStickyKeyDispatcherMultipleConsumers.java | 9 +++++++++
...PersistentStickyKeyDispatcherMultipleConsumers.java | 11 +++++++++++
.../PersistentDispatcherFailoverConsumerTest.java | 14 ++++++++++++++
...istentStickyKeyDispatcherMultipleConsumersTest.java | 18 +++++++++++++++---
...istentStickyKeyDispatcherMultipleConsumersTest.java | 11 +++++++++++
7 files changed, 62 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 8cab06be116..17a6d1dbfb3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -140,6 +140,7 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
this.topicName, consumer);
consumer.disconnect();
+ return;
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 96945840255..1d74d00776d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends
AbstractDispatcher
protected final Subscription subscription;
private CompletableFuture<Void> closeFuture = null;
- private final String name;
+ protected final String name;
protected final Rate msgDrop;
protected static final
AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index e5e53496519..da7fe56bde1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -39,6 +39,8 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends
NonPersistentDispatcherMultipleConsumers {
@@ -84,6 +86,11 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
@Override
public synchronized void addConsumer(Consumer consumer) throws
BrokerServiceException {
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
name, consumer);
+ consumer.disconnect();
+ return;
+ }
super.addConsumer(consumer);
try {
selector.addConsumer(consumer);
@@ -168,4 +175,6 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
return (ksm.getKeySharedMode() == this.keySharedMode);
}
+
+ private static final Logger log =
LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 024ed8581ef..1df6366c389 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
@@ -100,8 +101,18 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
+ @VisibleForTesting
+ public StickyKeyConsumerSelector getSelector() {
+ return selector;
+ }
+
@Override
public synchronized void addConsumer(Consumer consumer) throws
BrokerServiceException {
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
name, consumer);
+ consumer.disconnect();
+ return;
+ }
super.addConsumer(consumer);
try {
selector.addConsumer(consumer);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 0d2e5a9ee02..e7425f7e4e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -301,6 +301,20 @@ public class PersistentDispatcherFailoverConsumerTest {
assertEquals(isActive, change.isIsActive());
}
+ @Test(timeOut = 10000)
+ public void testAddConsumerWhenClosed() throws Exception {
+ PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, pulsarTestContext.getBrokerService());
+ PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
+ PersistentDispatcherSingleActiveConsumer pdfc = new
PersistentDispatcherSingleActiveConsumer(cursorMock,
+ SubType.Failover, 0, topic, sub);
+ pdfc.close().get();
+
+ Consumer consumer = mock(Consumer.class);
+ pdfc.addConsumer(consumer);
+ verify(consumer, times(1)).disconnect();
+ assertEquals(0, pdfc.consumers.size());
+ }
+
@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception
{
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index 932c446669d..a57f9a8bafa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -47,6 +48,7 @@ import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import
org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
@@ -62,6 +64,7 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
private ServiceConfiguration configMock;
private NonPersistentStickyKeyDispatcherMultipleConsumers
nonpersistentDispatcher;
+ private StickyKeyConsumerSelector selector;
final String topicName = "non-persistent://public/default/testTopic";
@@ -88,10 +91,19 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
subscriptionMock = mock(NonPersistentSubscription.class);
-
+ selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
nonpersistentDispatcher = new
NonPersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, subscriptionMock,
- new HashRangeAutoSplitStickyKeyConsumerSelector());
+ topicMock, subscriptionMock, selector);
+ }
+
+ @Test(timeOut = 10000)
+ public void testAddConsumerWhenClosed() throws Exception {
+ nonpersistentDispatcher.close().get();
+ Consumer consumer = mock(Consumer.class);
+ nonpersistentDispatcher.addConsumer(consumer);
+ verify(consumer, times(1)).disconnect();
+ assertEquals(0, nonpersistentDispatcher.getConsumers().size());
+ assertTrue(selector.getConsumerKeyHashRanges().isEmpty());
}
@Test(timeOut = 10000)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index aa87b2aaa25..5ff988fdc46 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -159,6 +160,16 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
}
}
+ @Test(timeOut = 10000)
+ public void testAddConsumerWhenClosed() throws Exception {
+ persistentDispatcher.close().get();
+ Consumer consumer = mock(Consumer.class);
+ persistentDispatcher.addConsumer(consumer);
+ verify(consumer, times(1)).disconnect();
+ assertEquals(0, persistentDispatcher.getConsumers().size());
+
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
+ }
+
@Test
public void testSendMarkerMessage() {
try {