This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a846dc  Add read position when joining in the consumer stats. (#8274)
8a846dc is described below

commit 8a846dc24624c2c7fee2ea91c2f8fafb9fddabf6
Author: lipenghui <[email protected]>
AuthorDate: Fri Oct 16 20:49:16 2020 +0800

    Add read position when joining in the consumer stats. (#8274)
    
    Motivation
    To troubleshooting the consumer stop to receive messages under the 
key_shared subscription, it's difficult to know if the broker stops to dispatch 
messages to this consumer to preserve order. So this PR exposes a metrics to 
show the read position when the consumer joining so that we can compare the 
read position and the mark delete position to determine.
    
    Verifying this change
    Unit test added
---
 .../org/apache/pulsar/broker/service/Consumer.java |  8 +++++
 ...istentStickyKeyDispatcherMultipleConsumers.java |  9 +++---
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 37 ++++++++++++++++++++++
 .../pulsar/common/policies/data/ConsumerStats.java |  8 +++++
 4 files changed, 58 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 0e93d35..3a5625b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -131,6 +131,7 @@ public class Consumer {
 
     private static final double avgPercent = 0.9;
     private boolean preciseDispatcherFlowControl;
+    private PositionImpl readPositionWhenJoining;
 
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
@@ -553,6 +554,9 @@ public class Consumer {
         stats.unackedMessages = unackedMessages;
         stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
         stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
+        if (readPositionWhenJoining != null) {
+            stats.readPositionWhenJoining = readPositionWhenJoining.toString();
+        }
         return stats;
     }
 
@@ -739,5 +743,9 @@ public class Consumer {
         return preciseDispatcherFlowControl;
     }
 
+    public void setReadPositionWhenJoining(PositionImpl 
readPositionWhenJoining) {
+        this.readPositionWhenJoining = readPositionWhenJoining;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Consumer.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3cde9a1..1585cae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -107,12 +107,14 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             throw e;
         }
 
+        PositionImpl readPositionWhenJoining = (PositionImpl) 
cursor.getReadPosition();
+        consumer.setReadPositionWhenJoining(readPositionWhenJoining);
         // If this was the 1st consumer, or if all the messages are already 
acked, then we
         // don't need to do anything special
-        if (allowOutOfOrderDelivery == false
+        if (!allowOutOfOrderDelivery
                 && consumerList.size() > 1
                 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
-            recentlyJoinedConsumers.put(consumer, (PositionImpl) 
cursor.getReadPosition());
+            recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
         }
     }
 
@@ -154,8 +156,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
         groupedEntries.clear();
 
-        for (int i = 0; i < entriesCount; i++) {
-            Entry entry = entries.get(i);
+        for (Entry entry : entries) {
             Consumer c = selector.select(peekStickyKey(entry.getDataBuffer()));
             groupedEntries.computeIfAbsent(c, k -> new 
ArrayList<>()).add(entry);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index e8c92fd..e2bd1d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -65,6 +65,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -94,6 +95,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -112,12 +114,14 @@ import 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.BrokerAssignment;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -2532,4 +2536,37 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         int seconds = 
admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
         assertEquals(seconds, 3600);
     }
+
+    @Test
+    public void testGetReadPositionWhenJoining() throws Exception {
+        final String topic = 
"persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + 
UUID.randomUUID().toString();
+        final String subName = "my-sub";
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        final int messages = 10;
+        MessageIdImpl messageId = null;
+        for (int i = 0; i < messages; i++) {
+            messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + 
i).getBytes());
+        }
+
+        for (int i = 0; i < 2; i++) {
+            pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionType(SubscriptionType.Key_Shared)
+                    .subscriptionName(subName)
+                    .subscribe();
+        }
+
+        TopicStats stats = admin.topics().getStats(topic);
+        Assert.assertEquals(stats.subscriptions.size(), 1);
+        SubscriptionStats subStats = stats.subscriptions.get(subName);
+        Assert.assertNotNull(subStats);
+        Assert.assertEquals(subStats.consumers.size(), 2);
+        ConsumerStats consumerStats = subStats.consumers.get(0);
+        Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
+                PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId() + 1).toString());
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 357407a..837390f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -59,6 +59,9 @@ public class ConsumerStats {
     /** Flag to verify if consumer is blocked due to reaching threshold of 
unacked messages. */
     public boolean blockedConsumerOnUnackedMsgs;
 
+    /** The read position of the cursor when the consumer joining. */
+    public String readPositionWhenJoining;
+
     /** Address of this consumer. */
     private int addressOffset = -1;
     private int addressLength;
@@ -93,6 +96,7 @@ public class ConsumerStats {
         this.availablePermits += stats.availablePermits;
         this.unackedMessages += stats.unackedMessages;
         this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
+        this.readPositionWhenJoining = stats.readPositionWhenJoining;
         return this;
     }
 
@@ -139,4 +143,8 @@ public class ConsumerStats {
         this.clientVersionLength = clientVersion.length();
         this.stringBuffer.append(clientVersion);
     }
+
+    public String getReadPositionWhenJoining() {
+        return readPositionWhenJoining;
+    }
 }

Reply via email to