sijie commented on a change in pull request #6051: Expose lastConsumedTimestamp 
and lastAckedTimestamp to consumer stats
URL: https://github.com/apache/pulsar/pull/6051#discussion_r368249464
 
 

 ##########
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
 ##########
 @@ -948,4 +952,98 @@ public void testCreateNamespaceWithNoClusters() throws 
PulsarAdminException {
         
assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
                 Collections.singletonList(localCluster));
     }
+
+    @Test(timeOut = 30000)
+    public void testConsumerStatsLastTimestamp() throws PulsarClientException, 
PulsarAdminException, InterruptedException {
+        long timestamp = System.currentTimeMillis();
+        final String topicName = "consumer-stats-" + timestamp;
+        final String subscribeName = topicName + "-test-stats-sub";
+        final String topic = "persistent://prop-xyz/ns1/" + topicName;
+        final String producerName = "producer-" + topicName;
+
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+        Producer<byte[]> producer = client.newProducer().topic(topic)
+            .enableBatching(false)
+            .producerName(producerName)
+            .create();
+
+        // a. Send a message to the topic.
+        producer.send("message-1".getBytes(StandardCharsets.UTF_8));
+
+        // b. Create a consumer, because there was a message in the topic, the 
consumer will receive the message pushed
+        // by the broker, the lastConsumedTimestamp will as the consume 
subscribe time.
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+            .subscriptionName(subscribeName)
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+        // Get the consumer stats.
+        TopicStats topicStats = admin.topics().getStats(topic);
+        SubscriptionStats subscriptionStats = 
topicStats.subscriptions.get(subscribeName);
+        long startConsumedFlowTimestamp = 
subscriptionStats.lastConsumedFlowTimestamp;
+        long startAckedTimestampInSubStats = 
subscriptionStats.lastAckedTimestamp;
+        ConsumerStats consumerStats = subscriptionStats.consumers.get(0);
+        long startConsumedTimestampInConsumerStats = 
consumerStats.lastConsumedTimestamp;
+        long startAckedTimestampInConsumerStats = 
consumerStats.lastAckedTimestamp;
+
+        // Because the message was pushed by the broker, the consumedTimestamp 
should not as 0.
+        assertNotEquals(0, startConsumedTimestampInConsumerStats);
+        // There is no consumer ack the message, so the lastAckedTimestamp 
still as 0.
+        assertEquals(0, startAckedTimestampInConsumerStats);
+        assertNotEquals(0, startConsumedFlowTimestamp);
+        assertEquals(0, startAckedTimestampInSubStats);
+
+        // c. The Consumer receives the message and acks the message.
+        Message<byte[]> message = consumer.receive();
+        consumer.acknowledge(message);
+        // Waiting for the ack command send to the broker.
+        TimeUnit.SECONDS.sleep(5);
+
+        // Get the consumer stats.
+        topicStats = admin.topics().getStats(topic);
+        subscriptionStats = topicStats.subscriptions.get(subscribeName);
+        long consumedFlowTimestamp = 
subscriptionStats.lastConsumedFlowTimestamp;
+        long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp;
+        consumerStats = subscriptionStats.consumers.get(0);
+        long consumedTimestamp = consumerStats.lastConsumedTimestamp;
+        long ackedTimestamp = consumerStats.lastAckedTimestamp;
+
+        // The lastConsumedTimestamp should same as the last time because the 
broker does not push any messages and the
+        // consumer does not pull any messages.
+        assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp);
+        assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp);
+        assertNotEquals(0, consumedFlowTimestamp);
+        assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats);
+
+        // d. Send another messages. The lastConsumedTimestamp should be 
updated.
+        producer.send("message-2".getBytes(StandardCharsets.UTF_8));
+
+        // e. Receive the message and ack it.
+        message = consumer.receive();
 
 Review comment:
   same comments as above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to