This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8a846dc Add read position when joining in the consumer stats. (#8274)
8a846dc is described below
commit 8a846dc24624c2c7fee2ea91c2f8fafb9fddabf6
Author: lipenghui <[email protected]>
AuthorDate: Fri Oct 16 20:49:16 2020 +0800
Add read position when joining in the consumer stats. (#8274)
Motivation
To troubleshooting the consumer stop to receive messages under the
key_shared subscription, it's difficult to know if the broker stops to dispatch
messages to this consumer to preserve order. So this PR exposes a metrics to
show the read position when the consumer joining so that we can compare the
read position and the mark delete position to determine.
Verifying this change
Unit test added
---
.../org/apache/pulsar/broker/service/Consumer.java | 8 +++++
...istentStickyKeyDispatcherMultipleConsumers.java | 9 +++---
.../apache/pulsar/broker/admin/AdminApiTest.java | 37 ++++++++++++++++++++++
.../pulsar/common/policies/data/ConsumerStats.java | 8 +++++
4 files changed, 58 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 0e93d35..3a5625b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -131,6 +131,7 @@ public class Consumer {
private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
+ private PositionImpl readPositionWhenJoining;
public Consumer(Subscription subscription, SubType subType, String
topicName, long consumerId,
int priorityLevel, String consumerName,
@@ -553,6 +554,9 @@ public class Consumer {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
+ if (readPositionWhenJoining != null) {
+ stats.readPositionWhenJoining = readPositionWhenJoining.toString();
+ }
return stats;
}
@@ -739,5 +743,9 @@ public class Consumer {
return preciseDispatcherFlowControl;
}
+ public void setReadPositionWhenJoining(PositionImpl
readPositionWhenJoining) {
+ this.readPositionWhenJoining = readPositionWhenJoining;
+ }
+
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3cde9a1..1585cae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -107,12 +107,14 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
throw e;
}
+ PositionImpl readPositionWhenJoining = (PositionImpl)
cursor.getReadPosition();
+ consumer.setReadPositionWhenJoining(readPositionWhenJoining);
// If this was the 1st consumer, or if all the messages are already
acked, then we
// don't need to do anything special
- if (allowOutOfOrderDelivery == false
+ if (!allowOutOfOrderDelivery
&& consumerList.size() > 1
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
- recentlyJoinedConsumers.put(consumer, (PositionImpl)
cursor.getReadPosition());
+ recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
}
}
@@ -154,8 +156,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();
groupedEntries.clear();
- for (int i = 0; i < entriesCount; i++) {
- Entry entry = entries.get(i);
+ for (Entry entry : entries) {
Consumer c = selector.select(peekStickyKey(entry.getDataBuffer()));
groupedEntries.computeIfAbsent(c, k -> new
ArrayList<>()).add(entry);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index e8c92fd..e2bd1d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -65,6 +65,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -94,6 +95,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -112,12 +114,14 @@ import
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -2532,4 +2536,37 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
int seconds =
admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
assertEquals(seconds, 3600);
}
+
+ @Test
+ public void testGetReadPositionWhenJoining() throws Exception {
+ final String topic =
"persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" +
UUID.randomUUID().toString();
+ final String subName = "my-sub";
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ final int messages = 10;
+ MessageIdImpl messageId = null;
+ for (int i = 0; i < messages; i++) {
+ messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " +
i).getBytes());
+ }
+
+ for (int i = 0; i < 2; i++) {
+ pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionName(subName)
+ .subscribe();
+ }
+
+ TopicStats stats = admin.topics().getStats(topic);
+ Assert.assertEquals(stats.subscriptions.size(), 1);
+ SubscriptionStats subStats = stats.subscriptions.get(subName);
+ Assert.assertNotNull(subStats);
+ Assert.assertEquals(subStats.consumers.size(), 2);
+ ConsumerStats consumerStats = subStats.consumers.get(0);
+ Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
+ PositionImpl.get(messageId.getLedgerId(),
messageId.getEntryId() + 1).toString());
+ }
}
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 357407a..837390f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -59,6 +59,9 @@ public class ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of
unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;
+ /** The read position of the cursor when the consumer joining. */
+ public String readPositionWhenJoining;
+
/** Address of this consumer. */
private int addressOffset = -1;
private int addressLength;
@@ -93,6 +96,7 @@ public class ConsumerStats {
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
+ this.readPositionWhenJoining = stats.readPositionWhenJoining;
return this;
}
@@ -139,4 +143,8 @@ public class ConsumerStats {
this.clientVersionLength = clientVersion.length();
this.stringBuffer.append(clientVersion);
}
+
+ public String getReadPositionWhenJoining() {
+ return readPositionWhenJoining;
+ }
}