This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 644c6e43813 [fix][broker] Return if 
AbstractDispatcherSingleActiveConsumer closed (#19934)
644c6e43813 is described below

commit 644c6e43813dae37b30c56f48d15705123fe6110
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Apr 6 21:50:45 2023 +0800

    [fix][broker] Return if AbstractDispatcherSingleActiveConsumer closed 
(#19934)
---
 .../AbstractDispatcherSingleActiveConsumer.java        |  1 +
 .../NonPersistentDispatcherMultipleConsumers.java      |  2 +-
 ...PersistentStickyKeyDispatcherMultipleConsumers.java |  9 +++++++++
 ...PersistentStickyKeyDispatcherMultipleConsumers.java | 11 +++++++++++
 .../PersistentDispatcherFailoverConsumerTest.java      | 14 ++++++++++++++
 ...istentStickyKeyDispatcherMultipleConsumersTest.java | 18 +++++++++++++++---
 ...istentStickyKeyDispatcherMultipleConsumersTest.java | 11 +++++++++++
 7 files changed, 62 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 8cab06be116..17a6d1dbfb3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -140,6 +140,7 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
this.topicName, consumer);
             consumer.disconnect();
+            return;
         }
 
         if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 96945840255..1d74d00776d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends 
AbstractDispatcher
     protected final Subscription subscription;
 
     private CompletableFuture<Void> closeFuture = null;
-    private final String name;
+    protected final String name;
     protected final Rate msgDrop;
     protected static final 
AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index e5e53496519..da7fe56bde1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -39,6 +39,8 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends 
NonPersistentDispatcherMultipleConsumers {
 
@@ -84,6 +86,11 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 
     @Override
     public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
+        if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
name, consumer);
+            consumer.disconnect();
+            return;
+        }
         super.addConsumer(consumer);
         try {
             selector.addConsumer(consumer);
@@ -168,4 +175,6 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
     public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
         return (ksm.getKeySharedMode() == this.keySharedMode);
     }
+
+    private static final Logger log = 
LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 024ed8581ef..1df6366c389 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -100,8 +101,18 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         }
     }
 
+    @VisibleForTesting
+    public StickyKeyConsumerSelector getSelector() {
+        return selector;
+    }
+
     @Override
     public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
+        if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
name, consumer);
+            consumer.disconnect();
+            return;
+        }
         super.addConsumer(consumer);
         try {
             selector.addConsumer(consumer);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 0d2e5a9ee02..e7425f7e4e3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -301,6 +301,20 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertEquals(isActive, change.isIsActive());
     }
 
+    @Test(timeOut = 10000)
+    public void testAddConsumerWhenClosed() throws Exception {
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, pulsarTestContext.getBrokerService());
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
+        PersistentDispatcherSingleActiveConsumer pdfc = new 
PersistentDispatcherSingleActiveConsumer(cursorMock,
+                SubType.Failover, 0, topic, sub);
+        pdfc.close().get();
+
+        Consumer consumer = mock(Consumer.class);
+        pdfc.addConsumer(consumer);
+        verify(consumer, times(1)).disconnect();
+        assertEquals(0, pdfc.consumers.size());
+    }
+
     @Test
     public void testConsumerGroupChangesWithOldNewConsumers() throws Exception 
{
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index 932c446669d..a57f9a8bafa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -47,6 +48,7 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import 
org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
@@ -62,6 +64,7 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
     private ServiceConfiguration configMock;
 
     private NonPersistentStickyKeyDispatcherMultipleConsumers 
nonpersistentDispatcher;
+    private StickyKeyConsumerSelector selector;
 
     final String topicName = "non-persistent://public/default/testTopic";
 
@@ -88,10 +91,19 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
         doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
 
         subscriptionMock = mock(NonPersistentSubscription.class);
-
+        selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
         nonpersistentDispatcher = new 
NonPersistentStickyKeyDispatcherMultipleConsumers(
-            topicMock, subscriptionMock,
-            new HashRangeAutoSplitStickyKeyConsumerSelector());
+            topicMock, subscriptionMock, selector);
+    }
+
+    @Test(timeOut = 10000)
+    public void testAddConsumerWhenClosed() throws Exception {
+        nonpersistentDispatcher.close().get();
+        Consumer consumer = mock(Consumer.class);
+        nonpersistentDispatcher.addConsumer(consumer);
+        verify(consumer, times(1)).disconnect();
+        assertEquals(0, nonpersistentDispatcher.getConsumers().size());
+        assertTrue(selector.getConsumerKeyHashRanges().isEmpty());
     }
 
     @Test(timeOut = 10000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index aa87b2aaa25..5ff988fdc46 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -159,6 +160,16 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void testAddConsumerWhenClosed() throws Exception {
+        persistentDispatcher.close().get();
+        Consumer consumer = mock(Consumer.class);
+        persistentDispatcher.addConsumer(consumer);
+        verify(consumer, times(1)).disconnect();
+        assertEquals(0, persistentDispatcher.getConsumers().size());
+        
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
+    }
+
     @Test
     public void testSendMarkerMessage() {
         try {

Reply via email to