This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1b151feee6e6cfbbe9ddefe8aeac6b2d38391bdf Author: Yong Zhang <[email protected]> AuthorDate: Tue Feb 4 08:16:08 2020 +0800 Expose lastConsumedTimestamp and lastAckedTimestamp to consumer stats (#6051) --- Master Issue: #6046 *Motivation* Make people can use the timestamp to tell if acknowledge and consumption are happening. *Modifications* - Add lastConsumedTimestamp and lastAckedTimestamp to consume stats *Verify this change* - Pass the test `testConsumerStatsLastTimestamp` --- .../org/apache/pulsar/broker/service/Consumer.java | 7 ++ .../service/persistent/PersistentSubscription.java | 5 + .../apache/pulsar/broker/admin/AdminApiTest2.java | 110 +++++++++++++++++++++ .../pulsar/common/policies/data/ConsumerStats.java | 3 + .../common/policies/data/SubscriptionStats.java | 9 ++ site2/docs/admin-api-persistent-topics.md | 12 +++ 6 files changed, 146 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 731ae09..06393f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -79,6 +79,9 @@ public class Consumer { private final Rate msgOut; private final Rate msgRedeliver; + private long lastConsumedTimestamp; + private long lastAckedTimestamp; + // Represents how many messages we can safely send to the consumer without // overflowing its receiving queue. The consumer will use Flow commands to // increase its availability @@ -188,6 +191,7 @@ public class Consumer { */ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, int totalMessages, long totalBytes, RedeliveryTracker redeliveryTracker) { + this.lastConsumedTimestamp = System.currentTimeMillis(); final ChannelHandlerContext ctx = cnx.ctx(); final ChannelPromise writePromise = ctx.newPromise(); @@ -335,6 +339,7 @@ public class Consumer { } void messageAcked(CommandAck ack) { + this.lastAckedTimestamp = System.currentTimeMillis(); Map<String,Long> properties = Collections.emptyMap(); if (ack.getPropertiesCount() > 0) { properties = ack.getPropertiesList().stream() @@ -450,6 +455,8 @@ public class Consumer { } public ConsumerStats getStats() { + stats.lastAckedTimestamp = lastAckedTimestamp; + stats.lastConsumedTimestamp = lastConsumedTimestamp; stats.availablePermits = getAvailablePermits(); stats.unackedMessages = unackedMessages; stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 200d06e..8b5fe42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -89,6 +89,7 @@ public class PersistentSubscription implements Subscription { private PersistentMessageExpiryMonitor expiryMonitor; private long lastExpireTimestamp = 0L; + private long lastConsumedFlowTimestamp = 0L; // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; @@ -315,6 +316,7 @@ public class PersistentSubscription implements Subscription { @Override public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { + this.lastConsumedFlowTimestamp = System.currentTimeMillis(); dispatcher.consumerFlow(consumer, additionalNumberOfMessages); } @@ -935,6 +937,7 @@ public class PersistentSubscription implements Subscription { public SubscriptionStats getStats() { SubscriptionStats subStats = new SubscriptionStats(); subStats.lastExpireTimestamp = lastExpireTimestamp; + subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp; Dispatcher dispatcher = this.dispatcher; if (dispatcher != null) { dispatcher.getConsumers().forEach(consumer -> { @@ -944,6 +947,8 @@ public class PersistentSubscription implements Subscription { subStats.msgThroughputOut += consumerStats.msgThroughputOut; subStats.msgRateRedeliver += consumerStats.msgRateRedeliver; subStats.unackedMessages += consumerStats.unackedMessages; + subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp); + subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 4162ec9..8f90c35 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -31,6 +32,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -60,6 +62,8 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicDomain; @@ -948,4 +952,110 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace), Collections.singletonList(localCluster)); } + + @Test(timeOut = 30000) + public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException { + long timestamp = System.currentTimeMillis(); + final String topicName = "consumer-stats-" + timestamp; + final String subscribeName = topicName + "-test-stats-sub"; + final String topic = "persistent://prop-xyz/ns1/" + topicName; + final String producerName = "producer-" + topicName; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + Producer<byte[]> producer = client.newProducer().topic(topic) + .enableBatching(false) + .producerName(producerName) + .create(); + + // a. Send a message to the topic. + producer.send("message-1".getBytes(StandardCharsets.UTF_8)); + + // b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed + // by the broker, the lastConsumedTimestamp will as the consume subscribe time. + Consumer<byte[]> consumer = client.newConsumer().topic(topic) + .subscriptionName(subscribeName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message<byte[]> message = consumer.receive(); + + // Get the consumer stats. + TopicStats topicStats = admin.topics().getStats(topic); + SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subscribeName); + long startConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long startAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + ConsumerStats consumerStats = subscriptionStats.consumers.get(0); + long startConsumedTimestampInConsumerStats = consumerStats.lastConsumedTimestamp; + long startAckedTimestampInConsumerStats = consumerStats.lastAckedTimestamp; + + // Because the message was pushed by the broker, the consumedTimestamp should not as 0. + assertNotEquals(0, startConsumedTimestampInConsumerStats); + // There is no consumer ack the message, so the lastAckedTimestamp still as 0. + assertEquals(0, startAckedTimestampInConsumerStats); + assertNotEquals(0, startConsumedFlowTimestamp); + assertEquals(0, startAckedTimestampInSubStats); + + // c. The Consumer receives the message and acks the message. + consumer.acknowledge(message); + // Waiting for the ack command send to the broker. + while (true) { + topicStats = admin.topics().getStats(topic); + if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != 0) { + break; + } + TimeUnit.MILLISECONDS.sleep(100); + } + + // Get the consumer stats. + topicStats = admin.topics().getStats(topic); + subscriptionStats = topicStats.subscriptions.get(subscribeName); + long consumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + consumerStats = subscriptionStats.consumers.get(0); + long consumedTimestamp = consumerStats.lastConsumedTimestamp; + long ackedTimestamp = consumerStats.lastAckedTimestamp; + + // The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the + // consumer does not pull any messages. + assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp); + assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp); + assertNotEquals(0, consumedFlowTimestamp); + assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats); + + // d. Send another messages. The lastConsumedTimestamp should be updated. + producer.send("message-2".getBytes(StandardCharsets.UTF_8)); + + // e. Receive the message and ack it. + message = consumer.receive(); + consumer.acknowledge(message); + // Waiting for the ack command send to the broker. + while (true) { + topicStats = admin.topics().getStats(topic); + if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != ackedTimestampInSubStats) { + break; + } + TimeUnit.MILLISECONDS.sleep(100); + } + + // Get the consumer stats again. + topicStats = admin.topics().getStats(topic); + subscriptionStats = topicStats.subscriptions.get(subscribeName); + long lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long lastConsumedTimestampInSubStats = subscriptionStats.lastConsumedTimestamp; + long lastAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + consumerStats = subscriptionStats.consumers.get(0); + long lastConsumedTimestamp = consumerStats.lastConsumedTimestamp; + long lastAckedTimestamp = consumerStats.lastAckedTimestamp; + + assertTrue(consumedTimestamp < lastConsumedTimestamp); + assertTrue(ackedTimestamp < lastAckedTimestamp); + assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp); + assertTrue(startAckedTimestampInConsumerStats < lastAckedTimestamp); + assertTrue(consumedFlowTimestamp == lastConsumedFlowTimestamp); + assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats); + assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats); + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index f929e22..7411f03 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -59,6 +59,9 @@ public class ConsumerStats { private int clientVersionOffset = -1; private int clientVersionLength; + public long lastAckedTimestamp; + public long lastConsumedTimestamp; + /** Metadata (key/value strings) associated with this consumer. */ public Map<String, String> metadata; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 30b04c9..a4c2994 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -61,6 +61,15 @@ public class SubscriptionStats { /** Last message expire execution timestamp. */ public long lastExpireTimestamp; + /** Last received consume flow command timestamp. */ + public long lastConsumedFlowTimestamp; + + /** Last consume message timestamp. */ + public long lastConsumedTimestamp; + + /** Last acked message timestamp. */ + public long lastAckedTimestamp; + /** List of connected consumers on this subscription w/ their stats. */ public List<ConsumerStats> consumers; diff --git a/site2/docs/admin-api-persistent-topics.md b/site2/docs/admin-api-persistent-topics.md index 1f688c5..b09040d 100644 --- a/site2/docs/admin-api-persistent-topics.md +++ b/site2/docs/admin-api-persistent-topics.md @@ -222,6 +222,14 @@ It shows current statistics of a given non-partitioned topic. - **type**: This subscription type - **msgRateExpired**: The rate at which messages were discarded instead of dispatched from this subscription due to TTL + + - **lastExpireTimestamp**: The last message expire execution timestamp + + - **lastConsumedFlowTimestamp**: The last flow command received timestamp + + - **lastConsumedTimestamp**: The latest timestamp of all the consumed timestamp of the consumers + + - **lastAckedTimestamp**: The latest timestamp of all the acked timestamp of the consumers - **consumers**: The list of connected consumers for this subscription @@ -236,6 +244,10 @@ It shows current statistics of a given non-partitioned topic. - **unackedMessages**: Number of unacknowledged messages for the consumer - **blockedConsumerOnUnackedMsgs**: Flag to verify if the consumer is blocked due to reaching threshold of unacked messages + + - **lastConsumedTimestamp**: The timestamp of the consumer last consume a message + + - **lastAckedTimestamp**: The timestamp of the consumer last ack a message - **replication**: This section gives the stats for cross-colo replication of this topic
