This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4cc082170cb7346acf808a4842a2634339c3853d Author: lipenghui <[email protected]> AuthorDate: Fri Sep 17 14:35:02 2021 +0800 Fix wrong key-hash selector used for new consumers after all the previous consumers disconnected (#12035) We will encounter the issue after all the previous consumers disconnected and the new consumers connect to the topic with different key_shared policy. The root cause is we are using the previous dispatcher after the key_shared policy changed, so the fix is to use a new dispatcher after a new consumer with a different key-shared policy (cherry picked from commit 3a4755f50ef46c3d94ce9629478941d5224cb800) --- ...istentStickyKeyDispatcherMultipleConsumers.java | 43 +++++++++++++ .../nonpersistent/NonPersistentSubscription.java | 35 +++-------- ...istentStickyKeyDispatcherMultipleConsumers.java | 10 ++- .../service/persistent/PersistentSubscription.java | 8 ++- .../client/api/KeySharedSubscriptionTest.java | 71 ++++++++++++++++++++++ 5 files changed, 136 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 704fd93..878bac8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -18,28 +18,67 @@ */ package org.apache.pulsar.broker.service.nonpersistent; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.EntryBatchSizes; +import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.protocol.Commands; public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers { private final StickyKeyConsumerSelector selector; + private final KeySharedMode keySharedMode; public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription, + KeySharedMeta ksm) { + super(topic, subscription); + this.keySharedMode = ksm.getKeySharedMode(); + switch (this.keySharedMode) { + case STICKY: + this.selector = new HashRangeExclusiveStickyKeyConsumerSelector(); + break; + + case AUTO_SPLIT: + default: + ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration(); + if (conf.isSubscriptionKeySharedUseConsistentHashing()) { + this.selector = new ConsistentHashingStickyKeyConsumerSelector( + conf.getSubscriptionKeySharedConsistentHashingReplicaPoints()); + } else { + this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector(); + } + break; + } + } + + @VisibleForTesting + NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription, StickyKeyConsumerSelector selector) { super(topic, subscription); + if (selector instanceof HashRangeExclusiveStickyKeyConsumerSelector) { + keySharedMode = KeySharedMode.STICKY; + } else if (selector instanceof ConsistentHashingStickyKeyConsumerSelector + || selector instanceof HashRangeAutoSplitStickyKeyConsumerSelector) { + keySharedMode = KeySharedMode.AUTO_SPLIT; + } else { + keySharedMode = null; + } this.selector = selector; } @@ -121,4 +160,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis } } } + + public KeySharedMode getKeySharedMode() { + return keySharedMode; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index a400fae..a392e41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -28,22 +28,19 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException; -import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; -import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; -import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector; -import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl; @@ -143,29 +140,13 @@ public class NonPersistentSubscription implements Subscription { } break; case Key_Shared: - if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) { + KeySharedMeta ksm = consumer.getKeySharedMeta(); + KeySharedMode keySharedMode = ksm.getKeySharedMode(); + if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared + || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode() + != keySharedMode) { previousDispatcher = dispatcher; - - switch (consumer.getKeySharedMeta().getKeySharedMode()) { - case STICKY: - dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, - new HashRangeExclusiveStickyKeyConsumerSelector()); - break; - - case AUTO_SPLIT: - default: - StickyKeyConsumerSelector selector; - ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration(); - if (conf.isSubscriptionKeySharedUseConsistentHashing()) { - selector = new ConsistentHashingStickyKeyConsumerSelector( - conf.getSubscriptionKeySharedConsistentHashingReplicaPoints()); - } else { - selector = new HashRangeAutoSplitStickyKeyConsumerSelector(); - } - - dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector); - break; - } + this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm); } break; default: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index d4d64e2..e62f63e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.KeySharedMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private final StickyKeyConsumerSelector selector; private boolean isDispatcherStuckOnReplays = false; + private final KeySharedMode keySharedMode; /** * When a consumer joins, it will be added to this map with the current read position. @@ -76,8 +78,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); this.stuckConsumers = new HashSet<>(); this.nextStuckConsumers = new HashSet<>(); - - switch (ksm.getKeySharedMode()) { + this.keySharedMode = ksm.getKeySharedMode(); + switch (this.keySharedMode) { case AUTO_SPLIT: if (conf.isSubscriptionKeySharedUseConsistentHashing()) { selector = new ConsistentHashingStickyKeyConsumerSelector( @@ -408,6 +410,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true); } + public KeySharedMode getKeySharedMode() { + return this.keySharedMode; + } + public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() { return recentlyJoinedConsumers; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index cf01ace..1eb99e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.apache.pulsar.common.api.proto.TxnAction; @@ -256,9 +257,12 @@ public class PersistentSubscription implements Subscription { } break; case Key_Shared: - if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) { + KeySharedMeta ksm = consumer.getKeySharedMeta(); + KeySharedMode keySharedMode = ksm.getKeySharedMode(); + if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared + || ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode() + != keySharedMode) { previousDispatcher = dispatcher; - KeySharedMeta ksm = consumer.getKeySharedMeta(); dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, topic.getBrokerService().getPulsar().getConfiguration(), ksm); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index e785ac1..bc19852 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -39,6 +40,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -47,8 +49,12 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.common.api.proto.KeySharedMode; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.Murmur3_32Hash; import org.awaitility.Awaitility; @@ -93,6 +99,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { }; } + @DataProvider(name = "topicDomain") + public Object[][] topicDomainProvider() { + return new Object[][] { + { "persistent" }, + { "non-persistent" } + }; + } + @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { @@ -1012,6 +1026,63 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { }); } + @Test(dataProvider = "topicDomain") + public void testSelectorChangedAfterAllConsumerDisconnected(String topicDomain) throws PulsarClientException, + ExecutionException, InterruptedException { + final String topicName = TopicName.get(topicDomain, "public", "default", + "testSelectorChangedAfterAllConsumerDisconnected" + UUID.randomUUID()).toString(); + + final String subName = "my-sub"; + + Consumer<byte[]> consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .consumerName("first-consumer") + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()) + .cryptoKeyReader(new EncKeyReader()) + .subscribe(); + + CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName); + assertTrue(future.isDone()); + assertTrue(future.get().isPresent()); + Topic topic = future.get().get(); + KeySharedMode keySharedMode = getKeySharedModeOfSubscription(topic, subName); + assertNotNull(keySharedMode); + assertEquals(keySharedMode, KeySharedMode.AUTO_SPLIT); + + consumer1.close(); + + consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .consumerName("second-consumer") + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65535))) + .cryptoKeyReader(new EncKeyReader()) + .subscribe(); + + future = pulsar.getBrokerService().getTopicIfExists(topicName); + assertTrue(future.isDone()); + assertTrue(future.get().isPresent()); + topic = future.get().get(); + keySharedMode = getKeySharedModeOfSubscription(topic, subName); + assertNotNull(keySharedMode); + assertEquals(keySharedMode, KeySharedMode.STICKY); + consumer1.close(); + } + + private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) { + if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) { + return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription) + .getDispatcher()).getKeySharedMode(); + } else if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.non_persistent)) { + return ((NonPersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription) + .getDispatcher()).getKeySharedMode(); + } + return null; + } + private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException { return pulsarClient.newConsumer(Schema.STRING) .topic(topic)
