This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ce634e2521b52121f362ea7a93a105e2042173d4
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue Oct 5 15:21:00 2021 +0900

    [stats] Add Key_Shared metadata to topic stats (#11839)
    
    * Add Key_Shared metadata to topic stats
    
    * Add getters to SubscriptionStats
    
    * Resolve conflicts
    
    (cherry picked from commit 018073574aa0223963d0c996acf9a1c2a0edb6b8)
---
 .../nonpersistent/NonPersistentSubscription.java   | 10 ++++-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  6 ++-
 .../service/persistent/PersistentSubscription.java | 10 ++++-
 .../pulsar/broker/service/PersistentTopicTest.java | 43 ++++++++++++++++++++++
 .../common/policies/data/SubscriptionStats.java    |  6 +++
 .../policies/data/stats/SubscriptionStatsImpl.java |  7 ++++
 6 files changed, 78 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a392e41..59fab2d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -72,6 +72,8 @@ public class NonPersistentSubscription implements 
Subscription {
     // If isDurable is false(such as a Reader), remove subscription from topic 
when closing this subscription.
     private final boolean isDurable;
 
+    private KeySharedMode keySharedMode = null;
+
     public NonPersistentSubscription(NonPersistentTopic topic, String 
subscriptionName, boolean isDurable) {
         this.topic = topic;
         this.topicName = topic.getName();
@@ -141,7 +143,7 @@ public class NonPersistentSubscription implements 
Subscription {
                 break;
             case Key_Shared:
                 KeySharedMeta ksm = consumer.getKeySharedMeta();
-                KeySharedMode keySharedMode = ksm.getKeySharedMode();
+                keySharedMode = ksm.getKeySharedMode();
                 if (dispatcher == null || dispatcher.getType() != 
SubType.Key_Shared
                         || 
((NonPersistentStickyKeyDispatcherMultipleConsumers) 
dispatcher).getKeySharedMode()
                         != keySharedMode) {
@@ -460,6 +462,12 @@ public class NonPersistentSubscription implements 
Subscription {
 
         subStats.type = getTypeString();
         subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate();
+
+        KeySharedMode keySharedMode = this.keySharedMode;
+        if (getType() == SubType.Key_Shared && keySharedMode != null) {
+            subStats.keySharedMode = keySharedMode.toString();
+        }
+
         return subStats;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 9a036f4..f3bcbf2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -95,7 +95,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             break;
 
         default:
-            throw new IllegalArgumentException("Invalid key-shared mode: " + 
ksm.getKeySharedMode());
+            throw new IllegalArgumentException("Invalid key-shared mode: " + 
keySharedMode);
         }
     }
 
@@ -423,6 +423,10 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return selector.getConsumerKeyHashRanges();
     }
 
+    public boolean isAllowOutOfOrderDelivery() {
+        return allowOutOfOrderDelivery;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index b127b1e..212f61a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1071,8 +1071,14 @@ public class PersistentSubscription implements 
Subscription {
         subStats.isReplicated = isReplicated();
         subStats.isDurable = cursor.isDurable();
         if (getType() == SubType.Key_Shared && dispatcher instanceof 
PersistentStickyKeyDispatcherMultipleConsumers) {
-            LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers =
-                    ((PersistentStickyKeyDispatcherMultipleConsumers) 
dispatcher).getRecentlyJoinedConsumers();
+            PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher 
=
+                    (PersistentStickyKeyDispatcherMultipleConsumers) 
dispatcher;
+
+            subStats.allowOutOfOrderDelivery = 
keySharedDispatcher.isAllowOutOfOrderDelivery();
+            subStats.keySharedMode = 
keySharedDispatcher.getKeySharedMode().toString();
+
+            LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = 
keySharedDispatcher
+                    .getRecentlyJoinedConsumers();
             if (recentlyJoinedConsumers != null && 
recentlyJoinedConsumers.size() > 0) {
                 recentlyJoinedConsumers.forEach((k, v) -> {
                     
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 2998345..05ad5bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -119,6 +119,7 @@ import 
org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
@@ -2187,6 +2188,48 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         verify(serverCnx).execute(any());
     };
 
+    @Test
+    public void testKeySharedMetadataExposedToStats() throws Exception {
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentSubscription sub1 = new PersistentSubscription(topic, 
"key-shared-stats1", cursorMock, false);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, 
"key-shared-stats2", cursorMock, false);
+        PersistentSubscription sub3 = new PersistentSubscription(topic, 
"key-shared-stats3", cursorMock, false);
+
+        Consumer consumer1 = new Consumer(sub1, SubType.Key_Shared, 
topic.getName(), 1, 0, "Cons1", 50000, serverCnx,
+                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest,
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(false),
+                MessageId.latest);
+        sub1.addConsumer(consumer1);
+        consumer1.close();
+
+        SubscriptionStatsImpl stats1 = sub1.getStats(false, false);
+        assertEquals(stats1.keySharedMode, "AUTO_SPLIT");
+        assertFalse(stats1.allowOutOfOrderDelivery);
+
+        Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared, 
topic.getName(), 2, 0, "Cons2", 50000, serverCnx,
+                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest,
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(true),
+                MessageId.latest);
+        sub2.addConsumer(consumer2);
+        consumer2.close();
+
+        SubscriptionStatsImpl stats2 = sub2.getStats(false, false);
+        assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
+        assertTrue(stats2.allowOutOfOrderDelivery);
+
+        KeySharedMeta ksm =  new 
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
+                .setAllowOutOfOrderDelivery(false);
+        ksm.addHashRange().setStart(0).setEnd(65535);
+        Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared, 
topic.getName(), 3, 0, "Cons3", 50000, serverCnx,
+                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest, ksm, MessageId.latest);
+        sub3.addConsumer(consumer3);
+        consumer3.close();
+
+        SubscriptionStatsImpl stats3 = sub3.getStats(false, false);
+        assertEquals(stats3.keySharedMode, "STICKY");
+        assertFalse(stats3.allowOutOfOrderDelivery);
+    }
+
     private ByteBuf getMessageWithMetadata(byte[] data) {
         MessageMetadata messageData = new MessageMetadata()
                 .setPublishTime(System.currentTimeMillis())
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index c9427fb..2ce38aa 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -97,6 +97,12 @@ public interface SubscriptionStats {
     /** Mark that the subscription state is kept in sync across different 
regions. */
     boolean isReplicated();
 
+    /** Whether out of order delivery is allowed on the Key_Shared 
subscription. */
+    boolean isAllowOutOfOrderDelivery();
+
+    /** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */
+    String getKeySharedMode();
+
     /** This is for Key_Shared subscription to get the recentJoinedConsumers 
in the Key_Shared subscription. */
     Map<String, String> getConsumersAfterMarkDeletePosition();
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 03e4681..78781ac 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -103,6 +103,12 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
     /** Mark that the subscription state is kept in sync across different 
regions. */
     public boolean isReplicated;
 
+    /** Whether out of order delivery is allowed on the Key_Shared 
subscription. */
+    public boolean allowOutOfOrderDelivery;
+
+    /** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */
+    public String keySharedMode;
+
     /** This is for Key_Shared subscription to get the recentJoinedConsumers 
in the Key_Shared subscription. */
     public Map<String, String> consumersAfterMarkDeletePosition;
 
@@ -165,6 +171,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
                 this.consumers.get(i).add(stats.consumers.get(i));
             }
         }
+        this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery;
         
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
         this.nonContiguousDeletedMessagesRanges += 
stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += 
stats.nonContiguousDeletedMessagesRangesSerializedSize;

Reply via email to