This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 10819739 [improve] Update topic admin interface comment, add topic
admin test cases (#1202)
10819739 is described below
commit 10819739d8c0f66bf97745ba0bdf5630e09f58d4
Author: zhou zhuohan <[email protected]>
AuthorDate: Fri Apr 12 13:08:54 2024 +0800
[improve] Update topic admin interface comment, add topic admin test cases
(#1202)
---
integration-tests/conf/standalone.conf | 3 +
pulsaradmin/pkg/admin/topic.go | 179 ++++++++++++++++++++++----
pulsaradmin/pkg/admin/topic_test.go | 227 +++++++++++++++++++++++++++++++++
3 files changed, 384 insertions(+), 25 deletions(-)
diff --git a/integration-tests/conf/standalone.conf
b/integration-tests/conf/standalone.conf
index c816c8fd..ccb91f37 100644
--- a/integration-tests/conf/standalone.conf
+++ b/integration-tests/conf/standalone.conf
@@ -83,6 +83,9 @@ maxUnackedMessagesPerConsumer=50000
# Set maxMessageSize to 1MB rather than the default value 5MB for testing
maxMessageSize=1048576
+# enable topic level policies to test topic admin functions
+topicLevelPoliciesEnabled=true
+
### --- Authentication --- ###
# Enable TLS
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index e6057413..7badc634 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -26,16 +26,35 @@ import (
// Topics is admin interface for topics management
type Topics interface {
- // Create a topic
- Create(utils.TopicName, int) error
-
- // Delete a topic
- Delete(utils.TopicName, bool, bool) error
+ // Create a partitioned or non-partitioned topic
+ //
+ // @param topic
+ // topicName struct
+ // @param partitions
+ // number of topic partitions,
+ // when setting to 0, it will create a non-partitioned topic
+ Create(topic utils.TopicName, partitions int) error
+
+ // Delete a topic, this function can delete both partitioned or
non-partitioned topic
+ //
+ // @param topic
+ // topicName struct
+ // @param force
+ // delete topic forcefully
+ // @param nonPartitioned
+ // when set to true, topic will be treated as a non-partitioned
topic
+ // Otherwise it will be treated as a partitioned topic
+ Delete(topic utils.TopicName, force bool, nonPartitioned bool) error
// Update number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new
partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires
deletion of topic which is not supported.
- Update(utils.TopicName, int) error
+ //
+ // @param topic
+ // topicName struct
+ // @param partitions
+ // number of new partitions of already exist partitioned-topic
+ Update(topic utils.TopicName, partitions int) error
// GetMetadata returns metadata of a partitioned topic
GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
@@ -52,12 +71,24 @@ type Topics interface {
GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)
// GrantPermission grants a new permission to a client role on a single
topic
- GrantPermission(utils.TopicName, string, []utils.AuthAction) error
+ //
+ // @param topic
+ // topicName struct
+ // @param role
+ // client role to which grant permission
+ // @param action
+ // auth actions (e.g. produce and consume)
+ GrantPermission(topic utils.TopicName, role string, action
[]utils.AuthAction) error
// RevokePermission revokes permissions to a client role on a single
topic. If the permission
// was not set at the topic level, but rather at the namespace level,
this operation will
// return an error (HTTP status code 412).
- RevokePermission(utils.TopicName, string) error
+ //
+ // @param topic
+ // topicName struct
+ // @param role
+ // client role to which remove permissions
+ RevokePermission(topic utils.TopicName, role string) error
// Lookup a topic returns the broker URL that serves the topic
Lookup(utils.TopicName) (utils.LookupData, error)
@@ -69,24 +100,56 @@ type Topics interface {
GetLastMessageID(utils.TopicName) (utils.MessageID, error)
// GetMessageID returns the message Id by timestamp(ms) of a topic
- GetMessageID(utils.TopicName, int64) (utils.MessageID, error)
-
- // GetStats returns the stats for the topic
- // All the rates are computed over a 1 minute window and are relative
the last completed 1 minute period
+ //
+ // @param topic
+ // topicName struct
+ // @param timestamp
+ // absolute timestamp (in ms)
+ GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID,
error)
+
+ // GetStats returns the stats for the topic.
+ //
+ // All the rates are computed over a 1-minute window and are relative
the last completed 1-minute period
GetStats(utils.TopicName) (utils.TopicStats, error)
// GetStatsWithOption returns the stats for the topic
- GetStatsWithOption(utils.TopicName, utils.GetStatsOptions)
(utils.TopicStats, error)
+ //
+ // All the rates are computed over a 1-minute window and are relative
the last completed 1-minute period
+ //
+ // @param topic
+ // topicName struct
+ // @param option
+ // request option, e.g. get_precise_backlog or
subscription_backlog_size
+ GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions)
(utils.TopicStats, error)
// GetInternalStats returns the internal stats for the topic.
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats,
error)
// GetPartitionedStats returns the stats for the partitioned topic
- // All the rates are computed over a 1 minute window and are relative
the last completed 1 minute period
- GetPartitionedStats(utils.TopicName, bool)
(utils.PartitionedTopicStats, error)
+ //
+ // All the rates are computed over a 1-minute window and are relative
the last completed 1-minute period
+ //
+ // @param topic
+ // topicName struct
+ // @param perPartition
+ // flag to get stats per partition
+ GetPartitionedStats(topic utils.TopicName, perPartition bool)
(utils.PartitionedTopicStats, error)
// GetPartitionedStatsWithOption returns the stats for the partitioned
topic
- GetPartitionedStatsWithOption(utils.TopicName, bool,
utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
+ //
+ // All the rates are computed over a 1-minute window and are relative
the last completed 1-minute period
+ //
+ // @param topic
+ // topicName struct
+ // @param perPartition
+ // flag to get stats per partition
+ // @param option
+ // request option, e.g. get_precise_backlog or
subscription_backlog_size
+ GetPartitionedStatsWithOption(
+ topic utils.TopicName,
+ perPartition bool,
+ option utils.GetStatsOptions,
+ ) (utils.PartitionedTopicStats, error)
// Terminate the topic and prevent any more messages being published on
it
Terminate(utils.TopicName) (utils.MessageID, error)
@@ -111,7 +174,12 @@ type Topics interface {
GetMessageTTL(utils.TopicName) (int, error)
// SetMessageTTL Set the message TTL for a topic
- SetMessageTTL(utils.TopicName, int) error
+ //
+ // @param topic
+ // topicName struct
+ // @param messageTTL
+ // Message TTL in second
+ SetMessageTTL(topic utils.TopicName, messageTTL int) error
// RemoveMessageTTL Remove the message TTL for a topic
RemoveMessageTTL(utils.TopicName) error
@@ -120,7 +188,12 @@ type Topics interface {
GetMaxProducers(utils.TopicName) (int, error)
// SetMaxProducers Set max number of producers for a topic
- SetMaxProducers(utils.TopicName, int) error
+ //
+ // @param topic
+ // topicName struct
+ // @param maxProducers
+ // max number of producer
+ SetMaxProducers(topic utils.TopicName, maxProducers int) error
// RemoveMaxProducers Remove max number of producers for a topic
RemoveMaxProducers(utils.TopicName) error
@@ -129,7 +202,12 @@ type Topics interface {
GetMaxConsumers(utils.TopicName) (int, error)
// SetMaxConsumers Set max number of consumers for a topic
- SetMaxConsumers(utils.TopicName, int) error
+ //
+ // @param topic
+ // topicName struct
+ // @param maxConsumers
+ // max number of consumer
+ SetMaxConsumers(topic utils.TopicName, maxConsumers int) error
// RemoveMaxConsumers Remove max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error
@@ -138,7 +216,12 @@ type Topics interface {
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)
// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on
consumer for a topic
- SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error
+ //
+ // @param topic
+ // topicName struct
+ // @param maxUnackedNum
+ // max unAcked messages on each consumer
+ SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum
int) error
// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy
on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
@@ -147,7 +230,12 @@ type Topics interface {
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)
// SetMaxUnackMessagesPerSubscription Set max unacked messages policy
on subscription for a topic
- SetMaxUnackMessagesPerSubscription(utils.TopicName, int) error
+ //
+ // @param topic
+ // topicName struct
+ // @param maxUnackedNum
+ // max unAcked messages on subscription of a topic
+ SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum
int) error
// RemoveMaxUnackMessagesPerSubscription Remove max unacked messages
policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
@@ -192,13 +280,24 @@ type Topics interface {
GetDeduplicationStatus(utils.TopicName) (bool, error)
// SetDeduplicationStatus Set the deduplication policy for a topic
- SetDeduplicationStatus(utils.TopicName, bool) error
+ //
+ // @param topic
+ // topicName struct
+ // @param enabled
+ // set enable or disable deduplication of the topic
+ SetDeduplicationStatus(topic utils.TopicName, enabled bool) error
// RemoveDeduplicationStatus Remove the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error
// GetRetention returns the retention configuration for a topic
- GetRetention(utils.TopicName, bool) (*utils.RetentionPolicies, error)
+ //
+ // @param topic
+ // topicName struct
+ // @param applied
+ // when set to true, function will try to find policy applied to
this topic
+ // in namespace or broker level, if no policy set in topic level
+ GetRetention(topic utils.TopicName, applied bool)
(*utils.RetentionPolicies, error)
// RemoveRetention removes the retention configuration on a topic
RemoveRetention(utils.TopicName) error
@@ -206,16 +305,35 @@ type Topics interface {
// SetRetention sets the retention policy for a topic
SetRetention(utils.TopicName, utils.RetentionPolicies) error
- // Get the compaction threshold for a topic
+ // GetCompactionThreshold Get the compaction threshold for a topic.
+ //
+ // i.e. The maximum number of bytes can have before compaction is
triggered.
+ //
+ // @param topic
+ // topicName struct
+ // @param applied
+ // when set to true, function will try to find policy applied to
this topic
+ // in namespace or broker level, if no policy set in topic level
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64,
error)
- // Set the compaction threshold for a topic
+ // SetCompactionThreshold Set the compaction threshold for a topic
+ //
+ // @param topic
+ // topicName struct
+ // @param threshold
+ // maximum number of backlog bytes before compaction is triggered
SetCompactionThreshold(topic utils.TopicName, threshold int64) error
// Remove compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error
// GetBacklogQuotaMap returns backlog quota map for a topic
+ //
+ // @param topic
+ // topicName struct
+ // @param applied
+ // when set to true, function will try to find policy applied to
this topic
+ // in namespace or broker level, if no policy set in topic level
GetBacklogQuotaMap(topic utils.TopicName, applied bool)
(map[utils.BacklogQuotaType]utils.BacklogQuota, error)
// SetBacklogQuota sets a backlog quota for a topic
@@ -225,6 +343,12 @@ type Topics interface {
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error
// GetInactiveTopicPolicies gets the inactive topic policies on a topic
+ //
+ // @param topic
+ // topicName struct
+ // @param applied
+ // when set to true, function will try to find policy applied to
this topic
+ // in namespace or broker level, if no policy set in topic level
GetInactiveTopicPolicies(topic utils.TopicName, applied bool)
(utils.InactiveTopicPolicies, error)
// RemoveInactiveTopicPolicies removes inactive topic policies from a
topic
@@ -237,6 +361,11 @@ type Topics interface {
GetReplicationClusters(topic utils.TopicName) ([]string, error)
// SetReplicationClusters sets the replication clusters on a topic
+ //
+ // @param topic
+ // topicName struct
+ // @param data
+ // list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error
}
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index a9b1f002..a3609ff6 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -213,3 +213,230 @@ func TestNonPartitionState(t *testing.T) {
func newTopicName() string {
return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
}
+
+func TestDeleteNonPartitionedTopic(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 0)
+ assert.NoError(t, err)
+ err = admin.Topics().Delete(*topicName, false, true)
+ assert.NoError(t, err)
+ topicList, err := admin.Namespaces().GetTopics("public/default")
+ assert.NoError(t, err)
+ isTopicExist := false
+ for _, topicIterator := range topicList {
+ if topicIterator == topic {
+ isTopicExist = true
+ }
+ }
+ assert.Equal(t, false, isTopicExist)
+}
+
+func TestDeletePartitionedTopic(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 3)
+ assert.NoError(t, err)
+ err = admin.Topics().Delete(*topicName, false, false)
+ assert.NoError(t, err)
+ topicList, err := admin.Namespaces().GetTopics("public/default")
+ assert.NoError(t, err)
+ isTopicExist := false
+ for _, topicIterator := range topicList {
+ if topicIterator == topic {
+ isTopicExist = true
+ }
+ }
+ assert.Equal(t, false, isTopicExist)
+}
+
+func TestUpdateTopicPartitions(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 3)
+ assert.NoError(t, err)
+ topicMetadata, err := admin.Topics().GetMetadata(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, 3, topicMetadata.Partitions)
+
+ err = admin.Topics().Update(*topicName, 4)
+ assert.NoError(t, err)
+ topicMetadata, err = admin.Topics().GetMetadata(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, 4, topicMetadata.Partitions)
+}
+
+func TestGetMessageID(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ topicPartitionZero := topic + "-partition-0"
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ topicPartitionZeroName, err := utils.GetTopicName(topicPartitionZero)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 1)
+ assert.NoError(t, err)
+ ctx := context.Background()
+
+ // create consumer
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+ consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: pulsar.Exclusive,
+ })
+ assert.NoError(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+ _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+ Payload: []byte("hello"),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.NoError(t, err)
+
+ // ack message
+ msg, err := consumer.Receive(ctx)
+ assert.NoError(t, err)
+ assert.Equal(t, []byte("hello"), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ err = consumer.Ack(msg)
+ assert.NoError(t, err)
+
+ messageID, err := admin.Topics().GetMessageID(
+ *topicPartitionZeroName,
+ msg.PublishTime().Unix()*1000-1000,
+ )
+ assert.NoError(t, err)
+ assert.Equal(t, msg.ID().EntryID(), messageID.EntryID)
+ assert.Equal(t, msg.ID().LedgerID(), messageID.LedgerID)
+ assert.Equal(t, int(msg.ID().PartitionIdx()), messageID.PartitionIndex)
+}
+
+func TestMessageTTL(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ messageTTL, err := admin.Topics().GetMessageTTL(*topicName)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, messageTTL)
+ err = admin.Topics().SetMessageTTL(*topicName, 600)
+ assert.NoError(t, err)
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ messageTTL, err =
admin.Topics().GetMessageTTL(*topicName)
+ return err == nil && messageTTL == 600
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+ err = admin.Topics().RemoveMessageTTL(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ messageTTL, err =
admin.Topics().GetMessageTTL(*topicName)
+ return err == nil && messageTTL == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestRetention(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+ topicName, err := utils.GetTopicName(topic)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*topicName, 4)
+ assert.NoError(t, err)
+
+ topicRetentionPolicy, err := admin.Topics().GetRetention(*topicName,
false)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), topicRetentionPolicy.RetentionSizeInMB)
+ assert.Equal(t, 0, topicRetentionPolicy.RetentionTimeInMinutes)
+ err = admin.Topics().SetRetention(*topicName, utils.RetentionPolicies{
+ RetentionSizeInMB: 20480,
+ RetentionTimeInMinutes: 1440,
+ })
+ assert.NoError(t, err)
+ // topic policy is an async operation,
+ // so we need to wait for a while to get current value
+ assert.Eventually(
+ t,
+ func() bool {
+ topicRetentionPolicy, err =
admin.Topics().GetRetention(*topicName, false)
+ return err == nil &&
+ topicRetentionPolicy.RetentionSizeInMB ==
int64(20480) &&
+ topicRetentionPolicy.RetentionTimeInMinutes ==
1440
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+ err = admin.Topics().RemoveRetention(*topicName)
+ assert.NoError(t, err)
+ assert.Eventually(
+ t,
+ func() bool {
+ topicRetentionPolicy, err =
admin.Topics().GetRetention(*topicName, false)
+ return err == nil &&
+ topicRetentionPolicy.RetentionSizeInMB ==
int64(0) &&
+ topicRetentionPolicy.RetentionTimeInMinutes == 0
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}