This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c4f47abd [Improve] Add optional parameters for getPartitionedStats
(#1193)
c4f47abd is described below
commit c4f47abd7c82402e7d4e95b55b24ebfef4bdb029
Author: crossoverJie <[email protected]>
AuthorDate: Tue Mar 5 18:40:40 2024 +0800
[Improve] Add optional parameters for getPartitionedStats (#1193)
### Motivation
To keep consistent with the Java client.
Releted PR: https://github.com/apache/pulsar/pull/21611
### Modifications
Add `GetStatsOptions` params.
---
Makefile | 2 +-
pulsar/consumer_test.go | 12 +--
pulsaradmin/pkg/admin/topic.go | 34 ++++++++
pulsaradmin/pkg/admin/topic_test.go | 160 ++++++++++++++++++++++++++++++++++++
pulsaradmin/pkg/utils/data.go | 8 ++
5 files changed, 209 insertions(+), 7 deletions(-)
diff --git a/Makefile b/Makefile
index 4eb590b0..d0442372 100644
--- a/Makefile
+++ b/Makefile
@@ -18,7 +18,7 @@
#
IMAGE_NAME = pulsar-client-go-test:latest
-PULSAR_VERSION ?= 2.10.3
+PULSAR_VERSION ?= 3.2.0
PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
GO_VERSION ?= 1.18
GOLANG_IMAGE = golang:$(GO_VERSION)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index d66e2376..4a3b532d 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -2219,6 +2219,12 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
assert.Nil(t, err)
defer producer.Close()
+ // Increase number of partitions to 10
+ makeHTTPCall(t, http.MethodPost, testURL, "10")
+
+ // Wait for the producer/consumers to pick up the change
+ time.Sleep(1 * time.Second)
+
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
@@ -2227,12 +2233,6 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
assert.Nil(t, err)
defer consumer.Close()
- // Increase number of partitions to 10
- makeHTTPCall(t, http.MethodPost, testURL, "10")
-
- // Wait for the producer/consumers to pick up the change
- time.Sleep(1 * time.Second)
-
// Publish messages ensuring that they will go to all the partitions
ctx := context.Background()
for i := 0; i < 10; i++ {
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index c888827b..e6057413 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -75,6 +75,9 @@ type Topics interface {
// All the rates are computed over a 1 minute window and are relative
the last completed 1 minute period
GetStats(utils.TopicName) (utils.TopicStats, error)
+ // GetStatsWithOption returns the stats for the topic
+ GetStatsWithOption(utils.TopicName, utils.GetStatsOptions)
(utils.TopicStats, error)
+
// GetInternalStats returns the internal stats for the topic.
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats,
error)
@@ -82,6 +85,9 @@ type Topics interface {
// All the rates are computed over a 1 minute window and are relative
the last completed 1 minute period
GetPartitionedStats(utils.TopicName, bool)
(utils.PartitionedTopicStats, error)
+ // GetPartitionedStatsWithOption returns the stats for the partitioned
topic
+ GetPartitionedStatsWithOption(utils.TopicName, bool,
utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
+
// Terminate the topic and prevent any more messages being published on
it
Terminate(utils.TopicName) (utils.MessageID, error)
@@ -395,6 +401,19 @@ func (t *topics) GetStats(topic utils.TopicName)
(utils.TopicStats, error) {
err := t.pulsar.Client.Get(endpoint, &stats)
return stats, err
}
+func (t *topics) GetStatsWithOption(topic utils.TopicName, option
utils.GetStatsOptions) (utils.TopicStats, error) {
+ var stats utils.TopicStats
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats")
+ params := map[string]string{
+ "getPreciseBacklog":
strconv.FormatBool(option.GetPreciseBacklog),
+ "subscriptionBacklogSize":
strconv.FormatBool(option.SubscriptionBacklogSize),
+ "getEarliestTimeInBacklog":
strconv.FormatBool(option.GetEarliestTimeInBacklog),
+ "excludePublishers":
strconv.FormatBool(option.ExcludePublishers),
+ "excludeConsumers":
strconv.FormatBool(option.ExcludeConsumers),
+ }
+ _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params,
true)
+ return stats, err
+}
func (t *topics) GetInternalStats(topic utils.TopicName)
(utils.PersistentTopicInternalStats, error) {
var stats utils.PersistentTopicInternalStats
@@ -412,6 +431,21 @@ func (t *topics) GetPartitionedStats(topic
utils.TopicName, perPartition bool) (
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params,
true)
return stats, err
}
+func (t *topics) GetPartitionedStatsWithOption(topic utils.TopicName,
perPartition bool,
+ option utils.GetStatsOptions) (utils.PartitionedTopicStats, error) {
+ var stats utils.PartitionedTopicStats
+ endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"partitioned-stats")
+ params := map[string]string{
+ "perPartition": strconv.FormatBool(perPartition),
+ "getPreciseBacklog":
strconv.FormatBool(option.GetPreciseBacklog),
+ "subscriptionBacklogSize":
strconv.FormatBool(option.SubscriptionBacklogSize),
+ "getEarliestTimeInBacklog":
strconv.FormatBool(option.GetEarliestTimeInBacklog),
+ "excludePublishers":
strconv.FormatBool(option.ExcludePublishers),
+ "excludeConsumers":
strconv.FormatBool(option.ExcludeConsumers),
+ }
+ _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params,
true)
+ return stats, err
+}
func (t *topics) Terminate(topic utils.TopicName) (utils.MessageID, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"terminate")
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index 06c33f2e..a9b1f002 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -18,12 +18,23 @@
package admin
import (
+ "context"
+ "fmt"
+ "log"
"testing"
+ "time"
+
+ "github.com/apache/pulsar-client-go/pulsar"
+ "github.com/stretchr/testify/assert"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
+var (
+ lookupURL = "pulsar://localhost:6650"
+)
+
func TestCreateTopic(t *testing.T) {
checkError := func(err error) {
if err != nil {
@@ -53,3 +64,152 @@ func TestCreateTopic(t *testing.T) {
}
t.Error("Couldn't find topic: " + topic)
}
+
+func TestPartitionState(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+
+ // Create partition topic
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ // Send message
+ ctx := context.Background()
+
+ // create consumer
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+ consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: pulsar.Exclusive,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if _, err := producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ stats, err := admin.Topics().GetPartitionedStatsWithOption(*topicName,
true, utils.GetStatsOptions{
+ GetPreciseBacklog: false,
+ SubscriptionBacklogSize: false,
+ GetEarliestTimeInBacklog: false,
+ ExcludePublishers: true,
+ ExcludeConsumers: true,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, len(stats.Publishers), 0)
+
+ for _, topicStats := range stats.Partitions {
+ assert.Equal(t, len(topicStats.Publishers), 0)
+ for _, subscriptionStats := range topicStats.Subscriptions {
+ assert.Equal(t, len(subscriptionStats.Consumers), 0)
+ }
+ }
+
+ for _, subscriptionStats := range stats.Subscriptions {
+ assert.Equal(t, len(subscriptionStats.Consumers), 0)
+ }
+
+}
+func TestNonPartitionState(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+
+ // Create non-partition topic
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 0)
+ assert.NoError(t, err)
+
+ // Send message
+ ctx := context.Background()
+
+ // create consumer
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+ consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: pulsar.Exclusive,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if _, err := producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ stats, err := admin.Topics().GetStatsWithOption(*topicName,
utils.GetStatsOptions{
+ GetPreciseBacklog: false,
+ SubscriptionBacklogSize: false,
+ GetEarliestTimeInBacklog: false,
+ ExcludePublishers: true,
+ ExcludeConsumers: true,
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, len(stats.Publishers), 0)
+ for _, subscriptionStats := range stats.Subscriptions {
+ assert.Equal(t, len(subscriptionStats.Consumers), 0)
+ }
+
+}
+
+func newTopicName() string {
+ return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
+}
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 55888aab..cc797d18 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -465,3 +465,11 @@ type CompactedLedger struct {
Offloaded bool `json:"offloaded"`
UnderReplicated bool `json:"underReplicated"`
}
+
+type GetStatsOptions struct {
+ GetPreciseBacklog bool `json:"get_precise_backlog"`
+ SubscriptionBacklogSize bool `json:"subscription_backlog_size"`
+ GetEarliestTimeInBacklog bool `json:"get_earliest_time_in_backlog"`
+ ExcludePublishers bool `json:"exclude_publishers"`
+ ExcludeConsumers bool `json:"exclude_consumers"`
+}