sijie commented on a change in pull request #6051: Expose lastConsumedTimestamp
and lastAckedTimestamp to consumer stats
URL: https://github.com/apache/pulsar/pull/6051#discussion_r368249408
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -948,4 +952,98 @@ public void testCreateNamespaceWithNoClusters() throws
PulsarAdminException {
assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
Collections.singletonList(localCluster));
}
+
+ @Test(timeOut = 30000)
+ public void testConsumerStatsLastTimestamp() throws PulsarClientException,
PulsarAdminException, InterruptedException {
+ long timestamp = System.currentTimeMillis();
+ final String topicName = "consumer-stats-" + timestamp;
+ final String subscribeName = topicName + "-test-stats-sub";
+ final String topic = "persistent://prop-xyz/ns1/" + topicName;
+ final String producerName = "producer-" + topicName;
+
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+ Producer<byte[]> producer = client.newProducer().topic(topic)
+ .enableBatching(false)
+ .producerName(producerName)
+ .create();
+
+ // a. Send a message to the topic.
+ producer.send("message-1".getBytes(StandardCharsets.UTF_8));
+
+ // b. Create a consumer, because there was a message in the topic, the
consumer will receive the message pushed
+ // by the broker, the lastConsumedTimestamp will as the consume
subscribe time.
+ Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+ .subscriptionName(subscribeName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ // Get the consumer stats.
+ TopicStats topicStats = admin.topics().getStats(topic);
+ SubscriptionStats subscriptionStats =
topicStats.subscriptions.get(subscribeName);
+ long startConsumedFlowTimestamp =
subscriptionStats.lastConsumedFlowTimestamp;
+ long startAckedTimestampInSubStats =
subscriptionStats.lastAckedTimestamp;
+ ConsumerStats consumerStats = subscriptionStats.consumers.get(0);
+ long startConsumedTimestampInConsumerStats =
consumerStats.lastConsumedTimestamp;
+ long startAckedTimestampInConsumerStats =
consumerStats.lastAckedTimestamp;
+
+ // Because the message was pushed by the broker, the consumedTimestamp
should not as 0.
+ assertNotEquals(0, startConsumedTimestampInConsumerStats);
+ // There is no consumer ack the message, so the lastAckedTimestamp
still as 0.
+ assertEquals(0, startAckedTimestampInConsumerStats);
+ assertNotEquals(0, startConsumedFlowTimestamp);
+ assertEquals(0, startAckedTimestampInSubStats);
+
+ // c. The Consumer receives the message and acks the message.
+ Message<byte[]> message = consumer.receive();
Review comment:
move this to the line 979
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services