This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9d65d0ec feat: Add Creation and Last Publish Timestamps to Topic Stats
(PIP-431) (#1451)
9d65d0ec is described below
commit 9d65d0ecf43f357a7a318c361553b258d88c56ea
Author: Penghui Li <[email protected]>
AuthorDate: Tue Dec 16 18:15:58 2025 -0800
feat: Add Creation and Last Publish Timestamps to Topic Stats (PIP-431)
(#1451)
---
pulsaradmin/pkg/admin/topic_test.go | 100 ++++++++++++++++++++++++++++++++++++
pulsaradmin/pkg/utils/data.go | 54 ++++++++++---------
2 files changed, 129 insertions(+), 25 deletions(-)
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index f40c0bbc..5655930d 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -1554,3 +1554,103 @@ func TestTopics_PublishRate(t *testing.T) {
100*time.Millisecond,
)
}
+
+func TestTopicStatsTimestamps(t *testing.T) {
+ topic :=
fmt.Sprintf("persistent://public/default/test-topic-stats-timestamps-%d",
time.Now().Nanosecond())
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+
+ // Create topic
+ err = admin.Topics().Create(*topicName, 0)
+ assert.NoError(t, err)
+ defer admin.Topics().Delete(*topicName, true, true)
+
+ // Get stats and verify CreationTimestamp
+ stats, err := admin.Topics().GetStats(*topicName)
+ assert.NoError(t, err)
+ assert.Greater(t, stats.TopicCreationTimeStamp, int64(0),
"CreationTimestamp should be greater than 0")
+ // LastPublishTimestamp should be 0 before any message is published
+ assert.Equal(t, int64(0), stats.LastPublishTimestamp,
"LastPublishTimestamp should be 0 initially")
+
+ // Publish a message
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte("test-message"),
+ })
+ assert.NoError(t, err)
+
+ // Wait for stats to update (stats are updated asynchronously in broker)
+ assert.Eventually(t, func() bool {
+ s, err := admin.Topics().GetStats(*topicName)
+ if err != nil {
+ return false
+ }
+ return s.LastPublishTimestamp > 0
+ }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should
update after publishing")
+}
+
+func TestPartitionedTopicStatsTimestamps(t *testing.T) {
+ topic :=
fmt.Sprintf("persistent://public/default/test-partitioned-topic-stats-timestamps-%d",
time.Now().Nanosecond())
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+
+ // Create partitioned topic
+ err = admin.Topics().Create(*topicName, 2)
+ assert.NoError(t, err)
+ defer admin.Topics().Delete(*topicName, true, true)
+
+ // Get partitioned stats and verify CreationTimestamp
+ stats, err := admin.Topics().GetPartitionedStats(*topicName, true)
+ assert.NoError(t, err)
+ assert.Greater(t, stats.TopicCreationTimeStamp, int64(0),
"CreationTimestamp should be greater than 0")
+ assert.Equal(t, int64(0), stats.LastPublishTimestamp,
"LastPublishTimestamp should be 0 initially")
+
+ // Publish a message
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte("test-message"),
+ })
+ assert.NoError(t, err)
+
+ // Wait for stats to update
+ assert.Eventually(t, func() bool {
+ s, err := admin.Topics().GetPartitionedStats(*topicName, true)
+ if err != nil {
+ return false
+ }
+ // Partitioned stats LastPublishTimestamp is usually an
aggregation or max of partitions
+ return s.LastPublishTimestamp > 0
+ }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should
update after publishing")
+}
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 441aa33b..43f0ec62 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -229,19 +229,21 @@ type NamespacesData struct {
}
type TopicStats struct {
- BacklogSize int64 `json:"backlogSize"`
- MsgCounterIn int64 `json:"msgInCounter"`
- MsgCounterOut int64 `json:"msgOutCounter"`
- MsgRateIn float64 `json:"msgRateIn"`
- MsgRateOut float64 `json:"msgRateOut"`
- MsgThroughputIn float64
`json:"msgThroughputIn"`
- MsgThroughputOut float64
`json:"msgThroughputOut"`
- AverageMsgSize float64 `json:"averageMsgSize"`
- StorageSize int64 `json:"storageSize"`
- Publishers []PublisherStats `json:"publishers"`
- Subscriptions map[string]SubscriptionStats `json:"subscriptions"`
- Replication map[string]ReplicatorStats `json:"replication"`
- DeDuplicationStatus string
`json:"deduplicationStatus"`
+ BacklogSize int64 `json:"backlogSize"`
+ MsgCounterIn int64
`json:"msgInCounter"`
+ MsgCounterOut int64
`json:"msgOutCounter"`
+ MsgRateIn float64 `json:"msgRateIn"`
+ MsgRateOut float64 `json:"msgRateOut"`
+ MsgThroughputIn float64
`json:"msgThroughputIn"`
+ MsgThroughputOut float64
`json:"msgThroughputOut"`
+ AverageMsgSize float64
`json:"averageMsgSize"`
+ StorageSize int64 `json:"storageSize"`
+ Publishers []PublisherStats `json:"publishers"`
+ Subscriptions map[string]SubscriptionStats
`json:"subscriptions"`
+ Replication map[string]ReplicatorStats `json:"replication"`
+ DeDuplicationStatus string
`json:"deduplicationStatus"`
+ TopicCreationTimeStamp int64
`json:"topicCreationTimeStamp,omitempty"`
+ LastPublishTimestamp int64
`json:"lastPublishTimestamp,omitempty"`
}
type ProducerAccessMode string
@@ -410,18 +412,20 @@ type CursorStats struct {
}
type PartitionedTopicStats struct {
- MsgRateIn float64 `json:"msgRateIn"`
- MsgRateOut float64 `json:"msgRateOut"`
- MsgThroughputIn float64
`json:"msgThroughputIn"`
- MsgThroughputOut float64
`json:"msgThroughputOut"`
- AverageMsgSize float64 `json:"averageMsgSize"`
- StorageSize int64 `json:"storageSize"`
- Publishers []PublisherStats `json:"publishers"`
- Subscriptions map[string]SubscriptionStats `json:"subscriptions"`
- Replication map[string]ReplicatorStats `json:"replication"`
- DeDuplicationStatus string
`json:"deduplicationStatus"`
- Metadata PartitionedTopicMetadata `json:"metadata"`
- Partitions map[string]TopicStats `json:"partitions"`
+ MsgRateIn float64 `json:"msgRateIn"`
+ MsgRateOut float64 `json:"msgRateOut"`
+ MsgThroughputIn float64
`json:"msgThroughputIn"`
+ MsgThroughputOut float64
`json:"msgThroughputOut"`
+ AverageMsgSize float64
`json:"averageMsgSize"`
+ StorageSize int64 `json:"storageSize"`
+ Publishers []PublisherStats `json:"publishers"`
+ Subscriptions map[string]SubscriptionStats
`json:"subscriptions"`
+ Replication map[string]ReplicatorStats `json:"replication"`
+ DeDuplicationStatus string
`json:"deduplicationStatus"`
+ Metadata PartitionedTopicMetadata `json:"metadata"`
+ Partitions map[string]TopicStats `json:"partitions"`
+ TopicCreationTimeStamp int64
`json:"topicCreationTimeStamp,omitempty"`
+ LastPublishTimestamp int64
`json:"lastPublishTimestamp,omitempty"`
}
type SchemaData struct {