This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ce634e2521b52121f362ea7a93a105e2042173d4 Author: Masahiro Sakamoto <[email protected]> AuthorDate: Tue Oct 5 15:21:00 2021 +0900 [stats] Add Key_Shared metadata to topic stats (#11839) * Add Key_Shared metadata to topic stats * Add getters to SubscriptionStats * Resolve conflicts (cherry picked from commit 018073574aa0223963d0c996acf9a1c2a0edb6b8) --- .../nonpersistent/NonPersistentSubscription.java | 10 ++++- ...istentStickyKeyDispatcherMultipleConsumers.java | 6 ++- .../service/persistent/PersistentSubscription.java | 10 ++++- .../pulsar/broker/service/PersistentTopicTest.java | 43 ++++++++++++++++++++++ .../common/policies/data/SubscriptionStats.java | 6 +++ .../policies/data/stats/SubscriptionStatsImpl.java | 7 ++++ 6 files changed, 78 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index a392e41..59fab2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -72,6 +72,8 @@ public class NonPersistentSubscription implements Subscription { // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription. private final boolean isDurable; + private KeySharedMode keySharedMode = null; + public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) { this.topic = topic; this.topicName = topic.getName(); @@ -141,7 +143,7 @@ public class NonPersistentSubscription implements Subscription { break; case Key_Shared: KeySharedMeta ksm = consumer.getKeySharedMeta(); - KeySharedMode keySharedMode = ksm.getKeySharedMode(); + keySharedMode = ksm.getKeySharedMode(); if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared || ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode() != keySharedMode) { @@ -460,6 +462,12 @@ public class NonPersistentSubscription implements Subscription { subStats.type = getTypeString(); subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate(); + + KeySharedMode keySharedMode = this.keySharedMode; + if (getType() == SubType.Key_Shared && keySharedMode != null) { + subStats.keySharedMode = keySharedMode.toString(); + } + return subStats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 9a036f4..f3bcbf2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -95,7 +95,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi break; default: - throw new IllegalArgumentException("Invalid key-shared mode: " + ksm.getKeySharedMode()); + throw new IllegalArgumentException("Invalid key-shared mode: " + keySharedMode); } } @@ -423,6 +423,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi return selector.getConsumerKeyHashRanges(); } + public boolean isAllowOutOfOrderDelivery() { + return allowOutOfOrderDelivery; + } + private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index b127b1e..212f61a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1071,8 +1071,14 @@ public class PersistentSubscription implements Subscription { subStats.isReplicated = isReplicated(); subStats.isDurable = cursor.isDurable(); if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) { - LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = - ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getRecentlyJoinedConsumers(); + PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher = + (PersistentStickyKeyDispatcherMultipleConsumers) dispatcher; + + subStats.allowOutOfOrderDelivery = keySharedDispatcher.isAllowOutOfOrderDelivery(); + subStats.keySharedMode = keySharedDispatcher.getKeySharedMode().toString(); + + LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = keySharedDispatcher + .getRecentlyJoinedConsumers(); if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) { recentlyJoinedConsumers.forEach((k, v) -> { subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 2998345..05ad5bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -119,6 +119,7 @@ import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; @@ -2187,6 +2188,48 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { verify(serverCnx).execute(any()); }; + @Test + public void testKeySharedMetadataExposedToStats() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + PersistentSubscription sub1 = new PersistentSubscription(topic, "key-shared-stats1", cursorMock, false); + PersistentSubscription sub2 = new PersistentSubscription(topic, "key-shared-stats2", cursorMock, false); + PersistentSubscription sub3 = new PersistentSubscription(topic, "key-shared-stats3", cursorMock, false); + + Consumer consumer1 = new Consumer(sub1, SubType.Key_Shared, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(false), + MessageId.latest); + sub1.addConsumer(consumer1); + consumer1.close(); + + SubscriptionStatsImpl stats1 = sub1.getStats(false, false); + assertEquals(stats1.keySharedMode, "AUTO_SPLIT"); + assertFalse(stats1.allowOutOfOrderDelivery); + + Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared, topic.getName(), 2, 0, "Cons2", 50000, serverCnx, + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(true), + MessageId.latest); + sub2.addConsumer(consumer2); + consumer2.close(); + + SubscriptionStatsImpl stats2 = sub2.getStats(false, false); + assertEquals(stats2.keySharedMode, "AUTO_SPLIT"); + assertTrue(stats2.allowOutOfOrderDelivery); + + KeySharedMeta ksm = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY) + .setAllowOutOfOrderDelivery(false); + ksm.addHashRange().setStart(0).setEnd(65535); + Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared, topic.getName(), 3, 0, "Cons3", 50000, serverCnx, + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, ksm, MessageId.latest); + sub3.addConsumer(consumer3); + consumer3.close(); + + SubscriptionStatsImpl stats3 = sub3.getStats(false, false); + assertEquals(stats3.keySharedMode, "STICKY"); + assertFalse(stats3.allowOutOfOrderDelivery); + } + private ByteBuf getMessageWithMetadata(byte[] data) { MessageMetadata messageData = new MessageMetadata() .setPublishTime(System.currentTimeMillis()) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index c9427fb..2ce38aa 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -97,6 +97,12 @@ public interface SubscriptionStats { /** Mark that the subscription state is kept in sync across different regions. */ boolean isReplicated(); + /** Whether out of order delivery is allowed on the Key_Shared subscription. */ + boolean isAllowOutOfOrderDelivery(); + + /** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */ + String getKeySharedMode(); + /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ Map<String, String> getConsumersAfterMarkDeletePosition(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index 03e4681..78781ac 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -103,6 +103,12 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** Mark that the subscription state is kept in sync across different regions. */ public boolean isReplicated; + /** Whether out of order delivery is allowed on the Key_Shared subscription. */ + public boolean allowOutOfOrderDelivery; + + /** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */ + public String keySharedMode; + /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ public Map<String, String> consumersAfterMarkDeletePosition; @@ -165,6 +171,7 @@ public class SubscriptionStatsImpl implements SubscriptionStats { this.consumers.get(i).add(stats.consumers.get(i)); } } + this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery; this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition); this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
