This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36c1c00 Key_Shared dispatcher with no connected consumers should be
recreated if allowOutOfOrderDelivery changes (#13063)
36c1c00 is described below
commit 36c1c002547df647ec1584bad4783d6529481db0
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Dec 2 01:15:04 2021 +0900
Key_Shared dispatcher with no connected consumers should be recreated if
allowOutOfOrderDelivery changes (#13063)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 4 +++
.../nonpersistent/NonPersistentSubscription.java | 5 ++-
...istentStickyKeyDispatcherMultipleConsumers.java | 13 +++++---
.../service/persistent/PersistentSubscription.java | 6 ++--
.../client/api/KeySharedSubscriptionTest.java | 38 ++++++++++++++++++++++
5 files changed, 55 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 878bac8..0dac0a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -164,4 +164,8 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
public KeySharedMode getKeySharedMode() {
return keySharedMode;
}
+
+ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
+ return (ksm.getKeySharedMode() == this.keySharedMode);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 59fab2d..056fd0f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -143,10 +143,9 @@ public class NonPersistentSubscription implements
Subscription {
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
- keySharedMode = ksm.getKeySharedMode();
if (dispatcher == null || dispatcher.getType() !=
SubType.Key_Shared
- ||
((NonPersistentStickyKeyDispatcherMultipleConsumers)
dispatcher).getKeySharedMode()
- != keySharedMode) {
+ ||
!((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+ .hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
this.dispatcher = new
NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5c8f33e..116a9eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -448,6 +448,15 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return this.keySharedMode;
}
+ public boolean isAllowOutOfOrderDelivery() {
+ return this.allowOutOfOrderDelivery;
+ }
+
+ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
+ return (ksm.getKeySharedMode() == this.keySharedMode
+ && ksm.isAllowOutOfOrderDelivery() ==
this.allowOutOfOrderDelivery);
+ }
+
public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}
@@ -456,10 +465,6 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return selector.getConsumerKeyHashRanges();
}
- public boolean isAllowOutOfOrderDelivery() {
- return allowOutOfOrderDelivery;
- }
-
private static final Logger log =
LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 061d038..1d878e5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -70,7 +70,6 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
@@ -270,10 +269,9 @@ public class PersistentSubscription implements
Subscription {
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
- KeySharedMode keySharedMode =
ksm.getKeySharedMode();
if (dispatcher == null || dispatcher.getType() !=
SubType.Key_Shared
- ||
((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
- != keySharedMode) {
+ ||
!((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
+ .hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
dispatcher = new
PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 1349e6c..a78e64d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -1068,6 +1069,43 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
consumer1.close();
}
+ @Test
+ public void
testAllowOutOfOrderDeliveryChangedAfterAllConsumerDisconnected() throws
Exception {
+ final String topicName =
"persistent://public/default/change-allow-ooo-delivery-" + UUID.randomUUID();
+ final String subName = "my-sub";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(true))
+ .subscribe();
+
+ CompletableFuture<Optional<Topic>> future =
pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ Topic topic = future.get().get();
+ PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers)
topic.getSubscription(subName).getDispatcher();
+ assertTrue(dispatcher.isAllowOutOfOrderDelivery());
+ consumer.close();
+
+ consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
+ .subscribe();
+
+ future = pulsar.getBrokerService().getTopicIfExists(topicName);
+ assertTrue(future.isDone());
+ assertTrue(future.get().isPresent());
+ topic = future.get().get();
+ dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers)
topic.getSubscription(subName).getDispatcher();
+ assertFalse(dispatcher.isAllowOutOfOrderDelivery());
+ consumer.close();
+ }
+
private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String
subscription) {
if
(TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
return ((PersistentStickyKeyDispatcherMultipleConsumers)
topic.getSubscription(subscription)