This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1b151feee6e6cfbbe9ddefe8aeac6b2d38391bdf
Author: Yong Zhang <[email protected]>
AuthorDate: Tue Feb 4 08:16:08 2020 +0800

    Expose lastConsumedTimestamp and lastAckedTimestamp to consumer stats 
(#6051)
    
    ---
    
    Master Issue: #6046
    
    *Motivation*
    
    Make people can use the timestamp to tell if acknowledge and consumption
    are happening.
    
    *Modifications*
    
    - Add lastConsumedTimestamp and lastAckedTimestamp to consume stats
    
    *Verify this change*
    
    - Pass the test `testConsumerStatsLastTimestamp`
---
 .../org/apache/pulsar/broker/service/Consumer.java |   7 ++
 .../service/persistent/PersistentSubscription.java |   5 +
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 110 +++++++++++++++++++++
 .../pulsar/common/policies/data/ConsumerStats.java |   3 +
 .../common/policies/data/SubscriptionStats.java    |   9 ++
 site2/docs/admin-api-persistent-topics.md          |  12 +++
 6 files changed, 146 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 731ae09..06393f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -79,6 +79,9 @@ public class Consumer {
     private final Rate msgOut;
     private final Rate msgRedeliver;
 
+    private long lastConsumedTimestamp;
+    private long lastAckedTimestamp;
+
     // Represents how many messages we can safely send to the consumer without
     // overflowing its receiving queue. The consumer will use Flow commands to
     // increase its availability
@@ -188,6 +191,7 @@ public class Consumer {
      */
     public ChannelPromise sendMessages(final List<Entry> entries, 
EntryBatchSizes batchSizes, int totalMessages,
             long totalBytes, RedeliveryTracker redeliveryTracker) {
+        this.lastConsumedTimestamp = System.currentTimeMillis();
         final ChannelHandlerContext ctx = cnx.ctx();
         final ChannelPromise writePromise = ctx.newPromise();
 
@@ -335,6 +339,7 @@ public class Consumer {
     }
 
     void messageAcked(CommandAck ack) {
+        this.lastAckedTimestamp = System.currentTimeMillis();
         Map<String,Long> properties = Collections.emptyMap();
         if (ack.getPropertiesCount() > 0) {
             properties = ack.getPropertiesList().stream()
@@ -450,6 +455,8 @@ public class Consumer {
     }
 
     public ConsumerStats getStats() {
+        stats.lastAckedTimestamp = lastAckedTimestamp;
+        stats.lastConsumedTimestamp = lastConsumedTimestamp;
         stats.availablePermits = getAvailablePermits();
         stats.unackedMessages = unackedMessages;
         stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 200d06e..8b5fe42 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -89,6 +89,7 @@ public class PersistentSubscription implements Subscription {
     private PersistentMessageExpiryMonitor expiryMonitor;
 
     private long lastExpireTimestamp = 0L;
+    private long lastConsumedFlowTimestamp = 0L;
 
     // for connected subscriptions, message expiry will be checked if the 
backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
@@ -315,6 +316,7 @@ public class PersistentSubscription implements Subscription 
{
 
     @Override
     public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
+        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
         dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
     }
 
@@ -935,6 +937,7 @@ public class PersistentSubscription implements Subscription 
{
     public SubscriptionStats getStats() {
         SubscriptionStats subStats = new SubscriptionStats();
         subStats.lastExpireTimestamp = lastExpireTimestamp;
+        subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
         Dispatcher dispatcher = this.dispatcher;
         if (dispatcher != null) {
             dispatcher.getConsumers().forEach(consumer -> {
@@ -944,6 +947,8 @@ public class PersistentSubscription implements Subscription 
{
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
                 subStats.unackedMessages += consumerStats.unackedMessages;
+                subStats.lastConsumedTimestamp = 
Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
+                subStats.lastAckedTimestamp = 
Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
             });
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 4162ec9..8f90c35 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -31,6 +32,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -60,6 +62,8 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -948,4 +952,110 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         
assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
                 Collections.singletonList(localCluster));
     }
+
+    @Test(timeOut = 30000)
+    public void testConsumerStatsLastTimestamp() throws PulsarClientException, 
PulsarAdminException, InterruptedException {
+        long timestamp = System.currentTimeMillis();
+        final String topicName = "consumer-stats-" + timestamp;
+        final String subscribeName = topicName + "-test-stats-sub";
+        final String topic = "persistent://prop-xyz/ns1/" + topicName;
+        final String producerName = "producer-" + topicName;
+
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+        Producer<byte[]> producer = client.newProducer().topic(topic)
+            .enableBatching(false)
+            .producerName(producerName)
+            .create();
+
+        // a. Send a message to the topic.
+        producer.send("message-1".getBytes(StandardCharsets.UTF_8));
+
+        // b. Create a consumer, because there was a message in the topic, the 
consumer will receive the message pushed
+        // by the broker, the lastConsumedTimestamp will as the consume 
subscribe time.
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+            .subscriptionName(subscribeName)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+        Message<byte[]> message = consumer.receive();
+
+        // Get the consumer stats.
+        TopicStats topicStats = admin.topics().getStats(topic);
+        SubscriptionStats subscriptionStats = 
topicStats.subscriptions.get(subscribeName);
+        long startConsumedFlowTimestamp = 
subscriptionStats.lastConsumedFlowTimestamp;
+        long startAckedTimestampInSubStats = 
subscriptionStats.lastAckedTimestamp;
+        ConsumerStats consumerStats = subscriptionStats.consumers.get(0);
+        long startConsumedTimestampInConsumerStats = 
consumerStats.lastConsumedTimestamp;
+        long startAckedTimestampInConsumerStats = 
consumerStats.lastAckedTimestamp;
+
+        // Because the message was pushed by the broker, the consumedTimestamp 
should not as 0.
+        assertNotEquals(0, startConsumedTimestampInConsumerStats);
+        // There is no consumer ack the message, so the lastAckedTimestamp 
still as 0.
+        assertEquals(0, startAckedTimestampInConsumerStats);
+        assertNotEquals(0, startConsumedFlowTimestamp);
+        assertEquals(0, startAckedTimestampInSubStats);
+
+        // c. The Consumer receives the message and acks the message.
+        consumer.acknowledge(message);
+        // Waiting for the ack command send to the broker.
+        while (true) {
+            topicStats = admin.topics().getStats(topic);
+            if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp 
!= 0) {
+                break;
+            }
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+
+        // Get the consumer stats.
+        topicStats = admin.topics().getStats(topic);
+        subscriptionStats = topicStats.subscriptions.get(subscribeName);
+        long consumedFlowTimestamp = 
subscriptionStats.lastConsumedFlowTimestamp;
+        long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
+        consumerStats = subscriptionStats.consumers.get(0);
+        long consumedTimestamp = consumerStats.lastConsumedTimestamp;
+        long ackedTimestamp = consumerStats.lastAckedTimestamp;
+
+        // The lastConsumedTimestamp should same as the last time because the 
broker does not push any messages and the
+        // consumer does not pull any messages.
+        assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp);
+        assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp);
+        assertNotEquals(0, consumedFlowTimestamp);
+        assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats);
+
+        // d. Send another messages. The lastConsumedTimestamp should be 
updated.
+        producer.send("message-2".getBytes(StandardCharsets.UTF_8));
+
+        // e. Receive the message and ack it.
+        message = consumer.receive();
+        consumer.acknowledge(message);
+        // Waiting for the ack command send to the broker.
+        while (true) {
+            topicStats = admin.topics().getStats(topic);
+            if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp 
!= ackedTimestampInSubStats) {
+                break;
+            }
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+
+        // Get the consumer stats again.
+        topicStats = admin.topics().getStats(topic);
+        subscriptionStats = topicStats.subscriptions.get(subscribeName);
+        long lastConsumedFlowTimestamp = 
subscriptionStats.lastConsumedFlowTimestamp;
+        long lastConsumedTimestampInSubStats = 
subscriptionStats.lastConsumedTimestamp;
+        long lastAckedTimestampInSubStats = 
subscriptionStats.lastAckedTimestamp;
+        consumerStats = subscriptionStats.consumers.get(0);
+        long lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
+        long lastAckedTimestamp = consumerStats.lastAckedTimestamp;
+
+        assertTrue(consumedTimestamp < lastConsumedTimestamp);
+        assertTrue(ackedTimestamp < lastAckedTimestamp);
+        assertTrue(startConsumedTimestampInConsumerStats < 
lastConsumedTimestamp);
+        assertTrue(startAckedTimestampInConsumerStats < lastAckedTimestamp);
+        assertTrue(consumedFlowTimestamp == lastConsumedFlowTimestamp);
+        assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);
+        assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats);
+
+        consumer.close();
+        producer.close();
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index f929e22..7411f03 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -59,6 +59,9 @@ public class ConsumerStats {
     private int clientVersionOffset = -1;
     private int clientVersionLength;
 
+    public long lastAckedTimestamp;
+    public long lastConsumedTimestamp;
+
     /** Metadata (key/value strings) associated with this consumer. */
     public Map<String, String> metadata;
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 30b04c9..a4c2994 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -61,6 +61,15 @@ public class SubscriptionStats {
     /** Last message expire execution timestamp. */
     public long lastExpireTimestamp;
 
+    /** Last received consume flow command timestamp. */
+    public long lastConsumedFlowTimestamp;
+
+    /** Last consume message timestamp. */
+    public long lastConsumedTimestamp;
+
+    /** Last acked message timestamp. */
+    public long lastAckedTimestamp;
+
     /** List of connected consumers on this subscription w/ their stats. */
     public List<ConsumerStats> consumers;
 
diff --git a/site2/docs/admin-api-persistent-topics.md 
b/site2/docs/admin-api-persistent-topics.md
index 1f688c5..b09040d 100644
--- a/site2/docs/admin-api-persistent-topics.md
+++ b/site2/docs/admin-api-persistent-topics.md
@@ -222,6 +222,14 @@ It shows current statistics of a given non-partitioned 
topic.
           -   **type**: This subscription type
 
           -   **msgRateExpired**: The rate at which messages were discarded 
instead of dispatched from this subscription due to TTL
+          
+          -   **lastExpireTimestamp**: The last message expire execution 
timestamp
+          
+          -   **lastConsumedFlowTimestamp**: The last flow command received 
timestamp 
+          
+          -   **lastConsumedTimestamp**: The latest timestamp of all the 
consumed timestamp of the consumers
+          
+          -   **lastAckedTimestamp**: The latest timestamp of all the acked 
timestamp of the consumers
 
           -   **consumers**: The list of connected consumers for this 
subscription
 
@@ -236,6 +244,10 @@ It shows current statistics of a given non-partitioned 
topic.
                 -   **unackedMessages**: Number of unacknowledged messages for 
the consumer
 
                 -   **blockedConsumerOnUnackedMsgs**: Flag to verify if the 
consumer is blocked due to reaching threshold of unacked messages
+                
+                -   **lastConsumedTimestamp**: The timestamp of the consumer 
last consume a message
+          
+                -   **lastAckedTimestamp**: The timestamp of the consumer last 
ack a message
 
   -   **replication**: This section gives the stats for cross-colo replication 
of this topic
 

Reply via email to