This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new c1b3024ed0f [fix][broker] Return if 
AbstractDispatcherSingleActiveConsumer closed (#19934)
c1b3024ed0f is described below

commit c1b3024ed0fa4c5306463aef019162598eb0e0bd
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Apr 6 21:50:45 2023 +0800

    [fix][broker] Return if AbstractDispatcherSingleActiveConsumer closed 
(#19934)
    
    (cherry picked from commit 42a6969efc8fdc16f2d883d3c4f2865bdb460091)
---
 .../service/AbstractDispatcherSingleActiveConsumer.java    |  1 +
 .../NonPersistentDispatcherMultipleConsumers.java          |  2 +-
 .../NonPersistentStickyKeyDispatcherMultipleConsumers.java |  9 +++++++++
 .../PersistentStickyKeyDispatcherMultipleConsumers.java    | 11 +++++++++++
 .../service/PersistentDispatcherFailoverConsumerTest.java  | 14 ++++++++++++++
 ...PersistentStickyKeyDispatcherMultipleConsumersTest.java | 13 +++++++++++++
 ...PersistentStickyKeyDispatcherMultipleConsumersTest.java | 11 +++++++++++
 7 files changed, 60 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 8cab06be116..17a6d1dbfb3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -140,6 +140,7 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
this.topicName, consumer);
             consumer.disconnect();
+            return;
         }
 
         if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 96945840255..1d74d00776d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends 
AbstractDispatcher
     protected final Subscription subscription;
 
     private CompletableFuture<Void> closeFuture = null;
-    private final String name;
+    protected final String name;
     protected final Rate msgDrop;
     protected static final 
AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index e5e53496519..da7fe56bde1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -39,6 +39,8 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends 
NonPersistentDispatcherMultipleConsumers {
 
@@ -84,6 +86,11 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 
     @Override
     public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
+        if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
name, consumer);
+            consumer.disconnect();
+            return;
+        }
         super.addConsumer(consumer);
         try {
             selector.addConsumer(consumer);
@@ -168,4 +175,6 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
     public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
         return (ksm.getKeySharedMode() == this.keySharedMode);
     }
+
+    private static final Logger log = 
LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a6032a12c3a..2cd05fe0045 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -95,8 +96,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         }
     }
 
+    @VisibleForTesting
+    public StickyKeyConsumerSelector getSelector() {
+        return selector;
+    }
+
     @Override
     public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
+        if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
name, consumer);
+            consumer.disconnect();
+            return;
+        }
         super.addConsumer(consumer);
         try {
             selector.addConsumer(consumer);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 7e35a1d0fc8..123425c5c47 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -295,6 +295,20 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertEquals(isActive, change.isIsActive());
     }
 
+    @Test(timeOut = 10000)
+    public void testAddConsumerWhenClosed() throws Exception {
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, pulsarTestContext.getBrokerService());
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
+        PersistentDispatcherSingleActiveConsumer pdfc = new 
PersistentDispatcherSingleActiveConsumer(cursorMock,
+                SubType.Failover, 0, topic, sub);
+        pdfc.close().get();
+
+        Consumer consumer = mock(Consumer.class);
+        pdfc.addConsumer(consumer);
+        verify(consumer, times(1)).disconnect();
+        assertEquals(0, pdfc.consumers.size());
+    }
+
     @Test
     public void testConsumerGroupChangesWithOldNewConsumers() throws Exception 
{
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index f319b7ce4a7..c3a858201dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -50,6 +51,7 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import 
org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
@@ -67,6 +69,7 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
     private ServiceConfiguration configMock;
 
     private NonPersistentStickyKeyDispatcherMultipleConsumers 
nonpersistentDispatcher;
+    private StickyKeyConsumerSelector selector;
 
     final String topicName = "non-persistent://public/default/testTopic";
 
@@ -107,6 +110,16 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void testAddConsumerWhenClosed() throws Exception {
+        nonpersistentDispatcher.close().get();
+        Consumer consumer = mock(Consumer.class);
+        nonpersistentDispatcher.addConsumer(consumer);
+        verify(consumer, times(1)).disconnect();
+        assertEquals(0, nonpersistentDispatcher.getConsumers().size());
+        assertTrue(selector.getConsumerKeyHashRanges().isEmpty());
+    }
+
     @Test(timeOut = 10000)
     public void testSendMessage() throws BrokerServiceException {
         Consumer consumerMock = mock(Consumer.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index fa9810a63c3..a9f155271a0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -158,6 +159,16 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void testAddConsumerWhenClosed() throws Exception {
+        persistentDispatcher.close().get();
+        Consumer consumer = mock(Consumer.class);
+        persistentDispatcher.addConsumer(consumer);
+        verify(consumer, times(1)).disconnect();
+        assertEquals(0, persistentDispatcher.getConsumers().size());
+        
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
+    }
+
     @Test
     public void testSendMarkerMessage() {
         try {

Reply via email to