This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new c1b3024ed0f [fix][broker] Return if
AbstractDispatcherSingleActiveConsumer closed (#19934)
c1b3024ed0f is described below
commit c1b3024ed0fa4c5306463aef019162598eb0e0bd
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Apr 6 21:50:45 2023 +0800
[fix][broker] Return if AbstractDispatcherSingleActiveConsumer closed
(#19934)
(cherry picked from commit 42a6969efc8fdc16f2d883d3c4f2865bdb460091)
---
.../service/AbstractDispatcherSingleActiveConsumer.java | 1 +
.../NonPersistentDispatcherMultipleConsumers.java | 2 +-
.../NonPersistentStickyKeyDispatcherMultipleConsumers.java | 9 +++++++++
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 11 +++++++++++
.../service/PersistentDispatcherFailoverConsumerTest.java | 14 ++++++++++++++
...PersistentStickyKeyDispatcherMultipleConsumersTest.java | 13 +++++++++++++
...PersistentStickyKeyDispatcherMultipleConsumersTest.java | 11 +++++++++++
7 files changed, 60 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 8cab06be116..17a6d1dbfb3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -140,6 +140,7 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
this.topicName, consumer);
consumer.disconnect();
+ return;
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 96945840255..1d74d00776d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -47,7 +47,7 @@ public class NonPersistentDispatcherMultipleConsumers extends
AbstractDispatcher
protected final Subscription subscription;
private CompletableFuture<Void> closeFuture = null;
- private final String name;
+ protected final String name;
protected final Rate msgDrop;
protected static final
AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index e5e53496519..da7fe56bde1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -39,6 +39,8 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends
NonPersistentDispatcherMultipleConsumers {
@@ -84,6 +86,11 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
@Override
public synchronized void addConsumer(Consumer consumer) throws
BrokerServiceException {
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
name, consumer);
+ consumer.disconnect();
+ return;
+ }
super.addConsumer(consumer);
try {
selector.addConsumer(consumer);
@@ -168,4 +175,6 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
return (ksm.getKeySharedMode() == this.keySharedMode);
}
+
+ private static final Logger log =
LoggerFactory.getLogger(NonPersistentStickyKeyDispatcherMultipleConsumers.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a6032a12c3a..2cd05fe0045 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
@@ -95,8 +96,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
}
+ @VisibleForTesting
+ public StickyKeyConsumerSelector getSelector() {
+ return selector;
+ }
+
@Override
public synchronized void addConsumer(Consumer consumer) throws
BrokerServiceException {
+ if (IS_CLOSED_UPDATER.get(this) == TRUE) {
+ log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
name, consumer);
+ consumer.disconnect();
+ return;
+ }
super.addConsumer(consumer);
try {
selector.addConsumer(consumer);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 7e35a1d0fc8..123425c5c47 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -295,6 +295,20 @@ public class PersistentDispatcherFailoverConsumerTest {
assertEquals(isActive, change.isIsActive());
}
+ @Test(timeOut = 10000)
+ public void testAddConsumerWhenClosed() throws Exception {
+ PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, pulsarTestContext.getBrokerService());
+ PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
+ PersistentDispatcherSingleActiveConsumer pdfc = new
PersistentDispatcherSingleActiveConsumer(cursorMock,
+ SubType.Failover, 0, topic, sub);
+ pdfc.close().get();
+
+ Consumer consumer = mock(Consumer.class);
+ pdfc.addConsumer(consumer);
+ verify(consumer, times(1)).disconnect();
+ assertEquals(0, pdfc.consumers.size());
+ }
+
@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception
{
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index f319b7ce4a7..c3a858201dd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -50,6 +51,7 @@ import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import
org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
@@ -67,6 +69,7 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
private ServiceConfiguration configMock;
private NonPersistentStickyKeyDispatcherMultipleConsumers
nonpersistentDispatcher;
+ private StickyKeyConsumerSelector selector;
final String topicName = "non-persistent://public/default/testTopic";
@@ -107,6 +110,16 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumersTest {
}
}
+ @Test(timeOut = 10000)
+ public void testAddConsumerWhenClosed() throws Exception {
+ nonpersistentDispatcher.close().get();
+ Consumer consumer = mock(Consumer.class);
+ nonpersistentDispatcher.addConsumer(consumer);
+ verify(consumer, times(1)).disconnect();
+ assertEquals(0, nonpersistentDispatcher.getConsumers().size());
+ assertTrue(selector.getConsumerKeyHashRanges().isEmpty());
+ }
+
@Test(timeOut = 10000)
public void testSendMessage() throws BrokerServiceException {
Consumer consumerMock = mock(Consumer.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index fa9810a63c3..a9f155271a0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -158,6 +159,16 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
}
}
+ @Test(timeOut = 10000)
+ public void testAddConsumerWhenClosed() throws Exception {
+ persistentDispatcher.close().get();
+ Consumer consumer = mock(Consumer.class);
+ persistentDispatcher.addConsumer(consumer);
+ verify(consumer, times(1)).disconnect();
+ assertEquals(0, persistentDispatcher.getConsumers().size());
+
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
+ }
+
@Test
public void testSendMarkerMessage() {
try {