This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9366a0ea [Improve] Admin GetStats: Fill in missing fields (#1309)
9366a0ea is described below

commit 9366a0eaff772b7097ab1bb2147e287460dc61ba
Author: crossoverJie <[email protected]>
AuthorDate: Thu Nov 21 16:58:18 2024 +0800

    [Improve] Admin GetStats: Fill in missing fields (#1309)
    
    * admin append topic stats
    
    * lint
---
 pulsaradmin/pkg/admin/topic_test.go | 52 +++++++++++++++++++++-
 pulsaradmin/pkg/utils/data.go       | 89 ++++++++++++++++++++++++++++---------
 2 files changed, 119 insertions(+), 22 deletions(-)

diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index fced0542..734cce1c 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -123,18 +123,21 @@ func TestPartitionState(t *testing.T) {
 
        assert.Nil(t, err)
        defer client.Close()
+       subName := "my-sub"
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic:            topic,
-               SubscriptionName: "my-sub",
+               SubscriptionName: subName,
                Type:             pulsar.Exclusive,
        })
        assert.Nil(t, err)
        defer consumer.Close()
 
        // create producer
+       producerName := "test-producer"
        producer, err := client.CreateProducer(pulsar.ProducerOptions{
                Topic:           topic,
                DisableBatching: false,
+               Name:            producerName,
        })
        assert.Nil(t, err)
        defer producer.Close()
@@ -173,6 +176,53 @@ func TestPartitionState(t *testing.T) {
                assert.Equal(t, len(subscriptionStats.Consumers), 0)
        }
 
+       partition, err := topicName.GetPartition(0)
+       assert.Nil(t, err)
+       topicState, err := admin.Topics().GetStats(*partition)
+       assert.Nil(t, err)
+       assert.Equal(t, len(topicState.Publishers), 1)
+       publisher := topicState.Publishers[0]
+       assert.Equal(t, publisher.AccessModel, utils.ProduceModeShared)
+       assert.Equal(t, publisher.IsSupportsPartialProducer, false)
+       assert.Equal(t, publisher.ProducerName, producerName)
+       assert.Contains(t, publisher.Address, "127.0.0.1")
+       assert.Contains(t, publisher.ClientVersion, "Pulsar Go version")
+
+       sub := topicState.Subscriptions[subName]
+       assert.Equal(t, sub.BytesOutCounter, int64(0))
+       assert.Equal(t, sub.MsgOutCounter, int64(0))
+       assert.Equal(t, sub.MessageAckRate, float64(0))
+       assert.Equal(t, sub.ChunkedMessageRate, float64(0))
+       assert.Equal(t, sub.BacklogSize, int64(0))
+       assert.Equal(t, sub.EarliestMsgPublishTimeInBacklog, int64(0))
+       assert.Equal(t, sub.LastExpireTimestamp, int64(0))
+       assert.Equal(t, sub.TotalMsgExpired, int64(0))
+       assert.Equal(t, sub.LastMarkDeleteAdvancedTimestamp, int64(0))
+       assert.Equal(t, sub.IsDurable, true)
+       assert.Equal(t, sub.AllowOutOfOrderDelivery, false)
+       assert.Equal(t, sub.ConsumersAfterMarkDeletePosition, 
map[string]string{})
+       assert.Equal(t, sub.NonContiguousDeletedMessagesRanges, 0)
+       assert.Equal(t, sub.NonContiguousDeletedMessagesRangesSrzSize, 0)
+       assert.Equal(t, sub.DelayedMessageIndexSizeInBytes, int64(0))
+       assert.Equal(t, sub.SubscriptionProperties, map[string]string{})
+       assert.Equal(t, sub.FilterProcessedMsgCount, int64(0))
+       assert.Equal(t, sub.FilterAcceptedMsgCount, int64(0))
+       assert.Equal(t, sub.FilterRejectedMsgCount, int64(0))
+       assert.Equal(t, sub.FilterRescheduledMsgCount, int64(0))
+
+       assert.Equal(t, len(sub.Consumers), 1)
+       consumerState := sub.Consumers[0]
+       assert.Equal(t, consumerState.BytesOutCounter, int64(0))
+       assert.Equal(t, consumerState.MsgOutCounter, int64(0))
+       assert.Equal(t, consumerState.MessageAckRate, float64(0))
+       assert.Equal(t, consumerState.ChunkedMessageRate, float64(0))
+       assert.Equal(t, consumerState.AvgMessagesPerEntry, int(0))
+       assert.Contains(t, consumerState.Address, "127.0.0.1")
+       assert.Contains(t, consumerState.ClientVersion, "Pulsar Go version")
+       assert.Equal(t, consumerState.LastAckedTimestamp, int64(0))
+       assert.Equal(t, consumerState.LastConsumedTimestamp, int64(0))
+       assert.True(t, consumerState.LastConsumedFlowTimestamp > 0)
+
 }
 func TestNonPartitionState(t *testing.T) {
        randomName := newTopicName()
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index c9664e71..58673dcc 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -244,31 +244,67 @@ type TopicStats struct {
        DeDuplicationStatus string                       
`json:"deduplicationStatus"`
 }
 
+type ProducerAccessMode string
+
+const (
+       ProduceModeShared               ProducerAccessMode = "Shared"
+       ProduceModeExclusive                               = "Exclusive"
+       ProduceModeExclusiveWithFencing                    = 
"ExclusiveWithFencing"
+       ProduceModeWaitForExclusive                        = "WaitForExclusive"
+)
+
 type PublisherStats struct {
-       ProducerID      int64             `json:"producerId"`
-       MsgRateIn       float64           `json:"msgRateIn"`
-       MsgThroughputIn float64           `json:"msgThroughputIn"`
-       AverageMsgSize  float64           `json:"averageMsgSize"`
-       Metadata        map[string]string `json:"metadata"`
+       AccessModel               ProducerAccessMode `json:"accessMode"`
+       ProducerID                int64              `json:"producerId"`
+       MsgRateIn                 float64            `json:"msgRateIn"`
+       MsgThroughputIn           float64            `json:"msgThroughputIn"`
+       AverageMsgSize            float64            `json:"averageMsgSize"`
+       ChunkedMessageRate        float64            `json:"chunkedMessageRate"`
+       IsSupportsPartialProducer bool               
`json:"supportsPartialProducer"`
+       ProducerName              string             `json:"producerName"`
+       Address                   string             `json:"address"`
+       ConnectedSince            string             `json:"connectedSince"`
+       ClientVersion             string             `json:"clientVersion"`
+       Metadata                  map[string]string  `json:"metadata"`
 }
 
 type SubscriptionStats struct {
-       BlockedSubscriptionOnUnackedMsgs bool            
`json:"blockedSubscriptionOnUnackedMsgs"`
-       IsReplicated                     bool            `json:"isReplicated"`
-       LastConsumedFlowTimestamp        int64           
`json:"lastConsumedFlowTimestamp"`
-       LastConsumedTimestamp            int64           
`json:"lastConsumedTimestamp"`
-       LastAckedTimestamp               int64           
`json:"lastAckedTimestamp"`
-       MsgRateOut                       float64         `json:"msgRateOut"`
-       MsgThroughputOut                 float64         
`json:"msgThroughputOut"`
-       MsgRateRedeliver                 float64         
`json:"msgRateRedeliver"`
-       MsgRateExpired                   float64         `json:"msgRateExpired"`
-       MsgBacklog                       int64           `json:"msgBacklog"`
-       MsgBacklogNoDelayed              int64           
`json:"msgBacklogNoDelayed"`
-       MsgDelayed                       int64           `json:"msgDelayed"`
-       UnAckedMessages                  int64           
`json:"unackedMessages"`
-       SubType                          string          `json:"type"`
-       ActiveConsumerName               string          
`json:"activeConsumerName"`
-       Consumers                        []ConsumerStats `json:"consumers"`
+       BlockedSubscriptionOnUnackedMsgs          bool              
`json:"blockedSubscriptionOnUnackedMsgs"`
+       IsReplicated                              bool              
`json:"isReplicated"`
+       LastConsumedFlowTimestamp                 int64             
`json:"lastConsumedFlowTimestamp"`
+       LastConsumedTimestamp                     int64             
`json:"lastConsumedTimestamp"`
+       LastAckedTimestamp                        int64             
`json:"lastAckedTimestamp"`
+       MsgRateOut                                float64           
`json:"msgRateOut"`
+       MsgThroughputOut                          float64           
`json:"msgThroughputOut"`
+       MsgRateRedeliver                          float64           
`json:"msgRateRedeliver"`
+       MsgRateExpired                            float64           
`json:"msgRateExpired"`
+       MsgBacklog                                int64             
`json:"msgBacklog"`
+       MsgBacklogNoDelayed                       int64             
`json:"msgBacklogNoDelayed"`
+       MsgDelayed                                int64             
`json:"msgDelayed"`
+       UnAckedMessages                           int64             
`json:"unackedMessages"`
+       SubType                                   string            
`json:"type"`
+       ActiveConsumerName                        string            
`json:"activeConsumerName"`
+       BytesOutCounter                           int64             
`json:"bytesOutCounter"`
+       MsgOutCounter                             int64             
`json:"msgOutCounter"`
+       MessageAckRate                            float64           
`json:"messageAckRate"`
+       ChunkedMessageRate                        float64           
`json:"chunkedMessageRate"`
+       BacklogSize                               int64             
`json:"backlogSize"`
+       EarliestMsgPublishTimeInBacklog           int64             
`json:"earliestMsgPublishTimeInBacklog"`
+       TotalMsgExpired                           int64             
`json:"totalMsgExpired"`
+       LastExpireTimestamp                       int64             
`json:"lastExpireTimestamp"`
+       LastMarkDeleteAdvancedTimestamp           int64             
`json:"lastMarkDeleteAdvancedTimestamp"`
+       Consumers                                 []ConsumerStats   
`json:"consumers"`
+       IsDurable                                 bool              
`json:"isDurable"`
+       AllowOutOfOrderDelivery                   bool              
`json:"allowOutOfOrderDelivery"`
+       ConsumersAfterMarkDeletePosition          map[string]string 
`json:"consumersAfterMarkDeletePosition"`
+       NonContiguousDeletedMessagesRanges        int               
`json:"nonContiguousDeletedMessagesRanges"`
+       NonContiguousDeletedMessagesRangesSrzSize int               
`json:"nonContiguousDeletedMessagesRangesSerializedSize"`
+       DelayedMessageIndexSizeInBytes            int64             
`json:"delayedMessageIndexSizeInBytes"`
+       SubscriptionProperties                    map[string]string 
`json:"subscriptionProperties"`
+       FilterProcessedMsgCount                   int64             
`json:"filterProcessedMsgCount"`
+       FilterAcceptedMsgCount                    int64             
`json:"filterAcceptedMsgCount"`
+       FilterRejectedMsgCount                    int64             
`json:"filterRejectedMsgCount"`
+       FilterRescheduledMsgCount                 int64             
`json:"filterRescheduledMsgCount"`
 }
 
 type ConsumerStats struct {
@@ -279,6 +315,17 @@ type ConsumerStats struct {
        MsgThroughputOut             float64           `json:"msgThroughputOut"`
        MsgRateRedeliver             float64           `json:"msgRateRedeliver"`
        ConsumerName                 string            `json:"consumerName"`
+       BytesOutCounter              int64             `json:"bytesOutCounter"`
+       MsgOutCounter                int64             `json:"msgOutCounter"`
+       MessageAckRate               float64           `json:"messageAckRate"`
+       ChunkedMessageRate           float64           
`json:"chunkedMessageRate"`
+       AvgMessagesPerEntry          int               
`json:"avgMessagesPerEntry"`
+       Address                      string            `json:"address"`
+       ConnectedSince               string            `json:"connectedSince"`
+       ClientVersion                string            `json:"clientVersion"`
+       LastAckedTimestamp           int64             
`json:"lastAckedTimestamp"`
+       LastConsumedTimestamp        int64             
`json:"lastConsumedTimestamp"`
+       LastConsumedFlowTimestamp    int64             
`json:"lastConsumedFlowTimestamp"`
        Metadata                     map[string]string `json:"metadata"`
 }
 

Reply via email to