sijie commented on a change in pull request #6051: Expose lastConsumedTimestamp and lastAckedTimestamp to consumer stats URL: https://github.com/apache/pulsar/pull/6051#discussion_r368249464
########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java ########## @@ -948,4 +952,98 @@ public void testCreateNamespaceWithNoClusters() throws PulsarAdminException { assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace), Collections.singletonList(localCluster)); } + + @Test(timeOut = 30000) + public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException { + long timestamp = System.currentTimeMillis(); + final String topicName = "consumer-stats-" + timestamp; + final String subscribeName = topicName + "-test-stats-sub"; + final String topic = "persistent://prop-xyz/ns1/" + topicName; + final String producerName = "producer-" + topicName; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + Producer<byte[]> producer = client.newProducer().topic(topic) + .enableBatching(false) + .producerName(producerName) + .create(); + + // a. Send a message to the topic. + producer.send("message-1".getBytes(StandardCharsets.UTF_8)); + + // b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed + // by the broker, the lastConsumedTimestamp will as the consume subscribe time. + Consumer<byte[]> consumer = client.newConsumer().topic(topic) + .subscriptionName(subscribeName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + // Get the consumer stats. + TopicStats topicStats = admin.topics().getStats(topic); + SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subscribeName); + long startConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long startAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + ConsumerStats consumerStats = subscriptionStats.consumers.get(0); + long startConsumedTimestampInConsumerStats = consumerStats.lastConsumedTimestamp; + long startAckedTimestampInConsumerStats = consumerStats.lastAckedTimestamp; + + // Because the message was pushed by the broker, the consumedTimestamp should not as 0. + assertNotEquals(0, startConsumedTimestampInConsumerStats); + // There is no consumer ack the message, so the lastAckedTimestamp still as 0. + assertEquals(0, startAckedTimestampInConsumerStats); + assertNotEquals(0, startConsumedFlowTimestamp); + assertEquals(0, startAckedTimestampInSubStats); + + // c. The Consumer receives the message and acks the message. + Message<byte[]> message = consumer.receive(); + consumer.acknowledge(message); + // Waiting for the ack command send to the broker. + TimeUnit.SECONDS.sleep(5); + + // Get the consumer stats. + topicStats = admin.topics().getStats(topic); + subscriptionStats = topicStats.subscriptions.get(subscribeName); + long consumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + consumerStats = subscriptionStats.consumers.get(0); + long consumedTimestamp = consumerStats.lastConsumedTimestamp; + long ackedTimestamp = consumerStats.lastAckedTimestamp; + + // The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the + // consumer does not pull any messages. + assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp); + assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp); + assertNotEquals(0, consumedFlowTimestamp); + assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats); + + // d. Send another messages. The lastConsumedTimestamp should be updated. + producer.send("message-2".getBytes(StandardCharsets.UTF_8)); + + // e. Receive the message and ack it. + message = consumer.receive(); Review comment: same comments as above ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services