codelipenghui commented on code in PR #1451:
URL: https://github.com/apache/pulsar-client-go/pull/1451#discussion_r2625348053
##########
pulsaradmin/pkg/admin/topic_test.go:
##########
@@ -1554,3 +1554,103 @@ func TestTopics_PublishRate(t *testing.T) {
100*time.Millisecond,
)
}
+
+func TestTopicStatsTimestamps(t *testing.T) {
+ topic :=
fmt.Sprintf("persistent://public/default/test-topic-stats-timestamps-%d",
time.Now().Nanosecond())
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+
+ // Create topic
+ err = admin.Topics().Create(*topicName, 0)
+ assert.NoError(t, err)
+ defer admin.Topics().Delete(*topicName, true, true)
+
+ // Get stats and verify CreationTimestamp
+ stats, err := admin.Topics().GetStats(*topicName)
+ assert.NoError(t, err)
+ assert.Greater(t, stats.TopicCreationTimeStamp, int64(0),
"CreationTimestamp should be greater than 0")
+ // LastPublishTimestamp should be 0 before any message is published
+ assert.Equal(t, int64(0), stats.LastPublishTimestamp,
"LastPublishTimestamp should be 0 initially")
+
+ // Publish a message
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte("test-message"),
+ })
+ assert.NoError(t, err)
+
+ // Wait for stats to update (stats are updated asynchronously in broker)
+ assert.Eventually(t, func() bool {
+ s, err := admin.Topics().GetStats(*topicName)
+ if err != nil {
+ return false
+ }
+ return s.LastPublishTimestamp > 0
+ }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should
update after publishing")
+}
+
+func TestPartitionedTopicStatsTimestamps(t *testing.T) {
+ topic :=
fmt.Sprintf("persistent://public/default/test-partitioned-topic-stats-timestamps-%d",
time.Now().Nanosecond())
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+
+ // Create partitioned topic
+ err = admin.Topics().Create(*topicName, 2)
+ assert.NoError(t, err)
+ defer admin.Topics().Delete(*topicName, true, true)
+
+ // Get partitioned stats and verify CreationTimestamp
+ stats, err := admin.Topics().GetPartitionedStats(*topicName, true)
+ assert.NoError(t, err)
+ assert.Greater(t, stats.TopicCreationTimeStamp, int64(0),
"CreationTimestamp should be greater than 0")
+ assert.Equal(t, int64(0), stats.LastPublishTimestamp,
"LastPublishTimestamp should be 0 initially")
+
+ // Publish a message
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte("test-message"),
+ })
+ assert.NoError(t, err)
+
+ // Wait for stats to update
+ assert.Eventually(t, func() bool {
+ s, err := admin.Topics().GetPartitionedStats(*topicName, true)
+ if err != nil {
+ return false
+ }
+ // Partitioned stats LastPublishTimestamp is usually an
aggregation or max of partitions
+ return s.LastPublishTimestamp > 0
+ }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should
update after publishing")
+}
Review Comment:
They are calling different method from Topic stats and Partitioned topic
stats. Will be confused to add too much if...else in the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]