This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 97ee82e Consumer is registered on dispatcher even if hash range
conflicts on Key_Shared subscription (#7444)
97ee82e is described below
commit 97ee82ed2dc337f81d7059c5d8980191d16dbfe3
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue Jul 7 01:25:07 2020 +0900
Consumer is registered on dispatcher even if hash range conflicts on
Key_Shared subscription (#7444)
---
.../broker/service/AbstractDispatcherSingleActiveConsumer.java | 4 ++--
.../NonPersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++--
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++--
.../apache/pulsar/client/api/KeySharedSubscriptionTest.java | 10 +++++++++-
4 files changed, 27 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 6c5f8a7..9948dcc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -155,8 +155,6 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
throw new ConsumerBusyException("Subscription reached max
consumers limit");
}
- consumers.add(consumer);
-
if (subscriptionType == SubType.Exclusive
&& consumer.getKeySharedMeta() != null
&& consumer.getKeySharedMeta().getHashRangesList() != null
@@ -168,6 +166,8 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
isKeyHashRangeFiltered = false;
}
+ consumers.add(consumer);
+
if (!pickAndScheduleActiveConsumer()) {
// the active consumer is not changed
Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 32cce87..37b29da 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -47,7 +47,13 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
@Override
public synchronized void addConsumer(Consumer consumer) throws
BrokerServiceException {
super.addConsumer(consumer);
- selector.addConsumer(consumer);
+ try {
+ selector.addConsumer(consumer);
+ } catch (BrokerServiceException e) {
+ consumerSet.removeAll(consumer);
+ consumerList.remove(consumer);
+ throw e;
+ }
}
@Override
@@ -99,4 +105,4 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
-sendMessageInfo.getTotalMessages());
}
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a4a532f..420552c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -93,7 +93,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
@Override
public synchronized void addConsumer(Consumer consumer) throws
BrokerServiceException {
super.addConsumer(consumer);
- selector.addConsumer(consumer);
+ try {
+ selector.addConsumer(consumer);
+ } catch (BrokerServiceException e) {
+ consumerSet.removeAll(consumer);
+ consumerList.remove(consumer);
+ throw e;
+ }
// If this was the 1st consumer, or if all the messages are already
acked, then we
// don't need to do anything special
@@ -294,4 +300,4 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
private static final Logger log =
LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 511f8c2..966dfda 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -658,7 +658,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
@Test
public void testHashRangeConflict() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
- final String topic = "testHashRangeConflict-" +
UUID.randomUUID().toString();
+ final String topic =
"persistent://public/default/testHashRangeConflict-" +
UUID.randomUUID().toString();
final String sub = "test";
Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub,
Range.of(0,99), Range.of(400, 65535));
@@ -667,6 +667,10 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub,
Range.of(100,399));
Assert.assertTrue(consumer2.isConnected());
+ PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers) pulsar
+
.getBrokerService().getTopicReference(topic).get().getSubscription(sub).getDispatcher();
+ Assert.assertEquals(dispatcher.getConsumers().size(), 2);
+
try {
createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
Assert.fail("Should failed with conflict range.");
@@ -679,7 +683,9 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
} catch (PulsarClientException.ConsumerAssignException ignore) {
}
+ Assert.assertEquals(dispatcher.getConsumers().size(), 2);
consumer1.close();
+ Assert.assertEquals(dispatcher.getConsumers().size(), 1);
try {
createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
@@ -705,9 +711,11 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub,
Range.of(50,99));
Assert.assertTrue(consumer4.isConnected());
+ Assert.assertEquals(dispatcher.getConsumers().size(), 3);
consumer2.close();
consumer3.close();
consumer4.close();
+ Assert.assertFalse(dispatcher.isConsumerConnected());
}
private Consumer<String> createFixedHashRangesConsumer(String topic,
String subscription, Range... ranges) throws PulsarClientException {