This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f47abd [Improve] Add optional parameters for getPartitionedStats 
(#1193)
c4f47abd is described below

commit c4f47abd7c82402e7d4e95b55b24ebfef4bdb029
Author: crossoverJie <[email protected]>
AuthorDate: Tue Mar 5 18:40:40 2024 +0800

    [Improve] Add optional parameters for getPartitionedStats (#1193)
    
    ### Motivation
    To keep consistent with the Java client.
    
    Releted PR: https://github.com/apache/pulsar/pull/21611
    
    ### Modifications
    
    Add `GetStatsOptions` params.
---
 Makefile                            |   2 +-
 pulsar/consumer_test.go             |  12 +--
 pulsaradmin/pkg/admin/topic.go      |  34 ++++++++
 pulsaradmin/pkg/admin/topic_test.go | 160 ++++++++++++++++++++++++++++++++++++
 pulsaradmin/pkg/utils/data.go       |   8 ++
 5 files changed, 209 insertions(+), 7 deletions(-)

diff --git a/Makefile b/Makefile
index 4eb590b0..d0442372 100644
--- a/Makefile
+++ b/Makefile
@@ -18,7 +18,7 @@
 #
 
 IMAGE_NAME = pulsar-client-go-test:latest
-PULSAR_VERSION ?= 2.10.3
+PULSAR_VERSION ?= 3.2.0
 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
 GO_VERSION ?= 1.18
 GOLANG_IMAGE = golang:$(GO_VERSION)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index d66e2376..4a3b532d 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -2219,6 +2219,12 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
        assert.Nil(t, err)
        defer producer.Close()
 
+       // Increase number of partitions to 10
+       makeHTTPCall(t, http.MethodPost, testURL, "10")
+
+       // Wait for the producer/consumers to pick up the change
+       time.Sleep(1 * time.Second)
+
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:               topic,
                SubscriptionName:    "my-sub",
@@ -2227,12 +2233,6 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
        assert.Nil(t, err)
        defer consumer.Close()
 
-       // Increase number of partitions to 10
-       makeHTTPCall(t, http.MethodPost, testURL, "10")
-
-       // Wait for the producer/consumers to pick up the change
-       time.Sleep(1 * time.Second)
-
        // Publish messages ensuring that they will go to all the partitions
        ctx := context.Background()
        for i := 0; i < 10; i++ {
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index c888827b..e6057413 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -75,6 +75,9 @@ type Topics interface {
        // All the rates are computed over a 1 minute window and are relative 
the last completed 1 minute period
        GetStats(utils.TopicName) (utils.TopicStats, error)
 
+       // GetStatsWithOption returns the stats for the topic
+       GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) 
(utils.TopicStats, error)
+
        // GetInternalStats returns the internal stats for the topic.
        GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, 
error)
 
@@ -82,6 +85,9 @@ type Topics interface {
        // All the rates are computed over a 1 minute window and are relative 
the last completed 1 minute period
        GetPartitionedStats(utils.TopicName, bool) 
(utils.PartitionedTopicStats, error)
 
+       // GetPartitionedStatsWithOption returns the stats for the partitioned 
topic
+       GetPartitionedStatsWithOption(utils.TopicName, bool, 
utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
+
        // Terminate the topic and prevent any more messages being published on 
it
        Terminate(utils.TopicName) (utils.MessageID, error)
 
@@ -395,6 +401,19 @@ func (t *topics) GetStats(topic utils.TopicName) 
(utils.TopicStats, error) {
        err := t.pulsar.Client.Get(endpoint, &stats)
        return stats, err
 }
+func (t *topics) GetStatsWithOption(topic utils.TopicName, option 
utils.GetStatsOptions) (utils.TopicStats, error) {
+       var stats utils.TopicStats
+       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats")
+       params := map[string]string{
+               "getPreciseBacklog":        
strconv.FormatBool(option.GetPreciseBacklog),
+               "subscriptionBacklogSize":  
strconv.FormatBool(option.SubscriptionBacklogSize),
+               "getEarliestTimeInBacklog": 
strconv.FormatBool(option.GetEarliestTimeInBacklog),
+               "excludePublishers":        
strconv.FormatBool(option.ExcludePublishers),
+               "excludeConsumers":         
strconv.FormatBool(option.ExcludeConsumers),
+       }
+       _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, 
true)
+       return stats, err
+}
 
 func (t *topics) GetInternalStats(topic utils.TopicName) 
(utils.PersistentTopicInternalStats, error) {
        var stats utils.PersistentTopicInternalStats
@@ -412,6 +431,21 @@ func (t *topics) GetPartitionedStats(topic 
utils.TopicName, perPartition bool) (
        _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, 
true)
        return stats, err
 }
+func (t *topics) GetPartitionedStatsWithOption(topic utils.TopicName, 
perPartition bool,
+       option utils.GetStatsOptions) (utils.PartitionedTopicStats, error) {
+       var stats utils.PartitionedTopicStats
+       endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"partitioned-stats")
+       params := map[string]string{
+               "perPartition":             strconv.FormatBool(perPartition),
+               "getPreciseBacklog":        
strconv.FormatBool(option.GetPreciseBacklog),
+               "subscriptionBacklogSize":  
strconv.FormatBool(option.SubscriptionBacklogSize),
+               "getEarliestTimeInBacklog": 
strconv.FormatBool(option.GetEarliestTimeInBacklog),
+               "excludePublishers":        
strconv.FormatBool(option.ExcludePublishers),
+               "excludeConsumers":         
strconv.FormatBool(option.ExcludeConsumers),
+       }
+       _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, 
true)
+       return stats, err
+}
 
 func (t *topics) Terminate(topic utils.TopicName) (utils.MessageID, error) {
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"terminate")
diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index 06c33f2e..a9b1f002 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -18,12 +18,23 @@
 package admin
 
 import (
+       "context"
+       "fmt"
+       "log"
        "testing"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/stretchr/testify/assert"
 
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
 )
 
+var (
+       lookupURL = "pulsar://localhost:6650"
+)
+
 func TestCreateTopic(t *testing.T) {
        checkError := func(err error) {
                if err != nil {
@@ -53,3 +64,152 @@ func TestCreateTopic(t *testing.T) {
        }
        t.Error("Couldn't find topic: " + topic)
 }
+
+func TestPartitionState(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+
+       // Create partition topic
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Send message
+       ctx := context.Background()
+
+       // create consumer
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+       consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+               Type:             pulsar.Exclusive,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if _, err := producer.Send(ctx, &pulsar.ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       stats, err := admin.Topics().GetPartitionedStatsWithOption(*topicName, 
true, utils.GetStatsOptions{
+               GetPreciseBacklog:        false,
+               SubscriptionBacklogSize:  false,
+               GetEarliestTimeInBacklog: false,
+               ExcludePublishers:        true,
+               ExcludeConsumers:         true,
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, len(stats.Publishers), 0)
+
+       for _, topicStats := range stats.Partitions {
+               assert.Equal(t, len(topicStats.Publishers), 0)
+               for _, subscriptionStats := range topicStats.Subscriptions {
+                       assert.Equal(t, len(subscriptionStats.Consumers), 0)
+               }
+       }
+
+       for _, subscriptionStats := range stats.Subscriptions {
+               assert.Equal(t, len(subscriptionStats.Consumers), 0)
+       }
+
+}
+func TestNonPartitionState(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+
+       // Create non-partition topic
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 0)
+       assert.NoError(t, err)
+
+       // Send message
+       ctx := context.Background()
+
+       // create consumer
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+       consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+               Type:             pulsar.Exclusive,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if _, err := producer.Send(ctx, &pulsar.ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       stats, err := admin.Topics().GetStatsWithOption(*topicName, 
utils.GetStatsOptions{
+               GetPreciseBacklog:        false,
+               SubscriptionBacklogSize:  false,
+               GetEarliestTimeInBacklog: false,
+               ExcludePublishers:        true,
+               ExcludeConsumers:         true,
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, len(stats.Publishers), 0)
+       for _, subscriptionStats := range stats.Subscriptions {
+               assert.Equal(t, len(subscriptionStats.Consumers), 0)
+       }
+
+}
+
+func newTopicName() string {
+       return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
+}
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 55888aab..cc797d18 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -465,3 +465,11 @@ type CompactedLedger struct {
        Offloaded       bool  `json:"offloaded"`
        UnderReplicated bool  `json:"underReplicated"`
 }
+
+type GetStatsOptions struct {
+       GetPreciseBacklog        bool `json:"get_precise_backlog"`
+       SubscriptionBacklogSize  bool `json:"subscription_backlog_size"`
+       GetEarliestTimeInBacklog bool `json:"get_earliest_time_in_backlog"`
+       ExcludePublishers        bool `json:"exclude_publishers"`
+       ExcludeConsumers         bool `json:"exclude_consumers"`
+}

Reply via email to