This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch pip-431-support in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 8ac9955621084f74a69e87e92fca33c84fcb1b50 Author: Penghui Li <[email protected]> AuthorDate: Tue Dec 16 12:27:52 2025 -0800 feat: Add Creation and Last Publish Timestamps to Topic Stats (PIP-431) --- pulsaradmin/pkg/admin/topic_test.go | 100 ++++++++++++++++++++++++++++++++++++ pulsaradmin/pkg/utils/data.go | 54 ++++++++++--------- 2 files changed, 129 insertions(+), 25 deletions(-) diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go index f40c0bbc..5655930d 100644 --- a/pulsaradmin/pkg/admin/topic_test.go +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -1554,3 +1554,103 @@ func TestTopics_PublishRate(t *testing.T) { 100*time.Millisecond, ) } + +func TestTopicStatsTimestamps(t *testing.T) { + topic := fmt.Sprintf("persistent://public/default/test-topic-stats-timestamps-%d", time.Now().Nanosecond()) + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + + // Create topic + err = admin.Topics().Create(*topicName, 0) + assert.NoError(t, err) + defer admin.Topics().Delete(*topicName, true, true) + + // Get stats and verify CreationTimestamp + stats, err := admin.Topics().GetStats(*topicName) + assert.NoError(t, err) + assert.Greater(t, stats.TopicCreationTimeStamp, int64(0), "CreationTimestamp should be greater than 0") + // LastPublishTimestamp should be 0 before any message is published + assert.Equal(t, int64(0), stats.LastPublishTimestamp, "LastPublishTimestamp should be 0 initially") + + // Publish a message + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + assert.NoError(t, err) + defer producer.Close() + + ctx := context.Background() + _, err = producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte("test-message"), + }) + assert.NoError(t, err) + + // Wait for stats to update (stats are updated asynchronously in broker) + assert.Eventually(t, func() bool { + s, err := admin.Topics().GetStats(*topicName) + if err != nil { + return false + } + return s.LastPublishTimestamp > 0 + }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should update after publishing") +} + +func TestPartitionedTopicStatsTimestamps(t *testing.T) { + topic := fmt.Sprintf("persistent://public/default/test-partitioned-topic-stats-timestamps-%d", time.Now().Nanosecond()) + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + + // Create partitioned topic + err = admin.Topics().Create(*topicName, 2) + assert.NoError(t, err) + defer admin.Topics().Delete(*topicName, true, true) + + // Get partitioned stats and verify CreationTimestamp + stats, err := admin.Topics().GetPartitionedStats(*topicName, true) + assert.NoError(t, err) + assert.Greater(t, stats.TopicCreationTimeStamp, int64(0), "CreationTimestamp should be greater than 0") + assert.Equal(t, int64(0), stats.LastPublishTimestamp, "LastPublishTimestamp should be 0 initially") + + // Publish a message + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + assert.NoError(t, err) + defer producer.Close() + + ctx := context.Background() + _, err = producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte("test-message"), + }) + assert.NoError(t, err) + + // Wait for stats to update + assert.Eventually(t, func() bool { + s, err := admin.Topics().GetPartitionedStats(*topicName, true) + if err != nil { + return false + } + // Partitioned stats LastPublishTimestamp is usually an aggregation or max of partitions + return s.LastPublishTimestamp > 0 + }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should update after publishing") +} diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go index 441aa33b..43f0ec62 100644 --- a/pulsaradmin/pkg/utils/data.go +++ b/pulsaradmin/pkg/utils/data.go @@ -229,19 +229,21 @@ type NamespacesData struct { } type TopicStats struct { - BacklogSize int64 `json:"backlogSize"` - MsgCounterIn int64 `json:"msgInCounter"` - MsgCounterOut int64 `json:"msgOutCounter"` - MsgRateIn float64 `json:"msgRateIn"` - MsgRateOut float64 `json:"msgRateOut"` - MsgThroughputIn float64 `json:"msgThroughputIn"` - MsgThroughputOut float64 `json:"msgThroughputOut"` - AverageMsgSize float64 `json:"averageMsgSize"` - StorageSize int64 `json:"storageSize"` - Publishers []PublisherStats `json:"publishers"` - Subscriptions map[string]SubscriptionStats `json:"subscriptions"` - Replication map[string]ReplicatorStats `json:"replication"` - DeDuplicationStatus string `json:"deduplicationStatus"` + BacklogSize int64 `json:"backlogSize"` + MsgCounterIn int64 `json:"msgInCounter"` + MsgCounterOut int64 `json:"msgOutCounter"` + MsgRateIn float64 `json:"msgRateIn"` + MsgRateOut float64 `json:"msgRateOut"` + MsgThroughputIn float64 `json:"msgThroughputIn"` + MsgThroughputOut float64 `json:"msgThroughputOut"` + AverageMsgSize float64 `json:"averageMsgSize"` + StorageSize int64 `json:"storageSize"` + Publishers []PublisherStats `json:"publishers"` + Subscriptions map[string]SubscriptionStats `json:"subscriptions"` + Replication map[string]ReplicatorStats `json:"replication"` + DeDuplicationStatus string `json:"deduplicationStatus"` + TopicCreationTimeStamp int64 `json:"topicCreationTimeStamp,omitempty"` + LastPublishTimestamp int64 `json:"lastPublishTimestamp,omitempty"` } type ProducerAccessMode string @@ -410,18 +412,20 @@ type CursorStats struct { } type PartitionedTopicStats struct { - MsgRateIn float64 `json:"msgRateIn"` - MsgRateOut float64 `json:"msgRateOut"` - MsgThroughputIn float64 `json:"msgThroughputIn"` - MsgThroughputOut float64 `json:"msgThroughputOut"` - AverageMsgSize float64 `json:"averageMsgSize"` - StorageSize int64 `json:"storageSize"` - Publishers []PublisherStats `json:"publishers"` - Subscriptions map[string]SubscriptionStats `json:"subscriptions"` - Replication map[string]ReplicatorStats `json:"replication"` - DeDuplicationStatus string `json:"deduplicationStatus"` - Metadata PartitionedTopicMetadata `json:"metadata"` - Partitions map[string]TopicStats `json:"partitions"` + MsgRateIn float64 `json:"msgRateIn"` + MsgRateOut float64 `json:"msgRateOut"` + MsgThroughputIn float64 `json:"msgThroughputIn"` + MsgThroughputOut float64 `json:"msgThroughputOut"` + AverageMsgSize float64 `json:"averageMsgSize"` + StorageSize int64 `json:"storageSize"` + Publishers []PublisherStats `json:"publishers"` + Subscriptions map[string]SubscriptionStats `json:"subscriptions"` + Replication map[string]ReplicatorStats `json:"replication"` + DeDuplicationStatus string `json:"deduplicationStatus"` + Metadata PartitionedTopicMetadata `json:"metadata"` + Partitions map[string]TopicStats `json:"partitions"` + TopicCreationTimeStamp int64 `json:"topicCreationTimeStamp,omitempty"` + LastPublishTimestamp int64 `json:"lastPublishTimestamp,omitempty"` } type SchemaData struct {
