This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 10819739 [improve] Update topic admin interface comment, add topic 
admin test cases (#1202)
10819739 is described below

commit 10819739d8c0f66bf97745ba0bdf5630e09f58d4
Author: zhou zhuohan <[email protected]>
AuthorDate: Fri Apr 12 13:08:54 2024 +0800

    [improve] Update topic admin interface comment, add topic admin test cases 
(#1202)
---
 integration-tests/conf/standalone.conf |   3 +
 pulsaradmin/pkg/admin/topic.go         | 179 ++++++++++++++++++++++----
 pulsaradmin/pkg/admin/topic_test.go    | 227 +++++++++++++++++++++++++++++++++
 3 files changed, 384 insertions(+), 25 deletions(-)

diff --git a/integration-tests/conf/standalone.conf 
b/integration-tests/conf/standalone.conf
index c816c8fd..ccb91f37 100644
--- a/integration-tests/conf/standalone.conf
+++ b/integration-tests/conf/standalone.conf
@@ -83,6 +83,9 @@ maxUnackedMessagesPerConsumer=50000
 # Set maxMessageSize to 1MB rather than the default value 5MB for testing
 maxMessageSize=1048576
 
+# enable topic level policies to test topic admin functions
+topicLevelPoliciesEnabled=true
+
 ### --- Authentication --- ###
 
 # Enable TLS
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index e6057413..7badc634 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -26,16 +26,35 @@ import (
 
 // Topics is admin interface for topics management
 type Topics interface {
-       // Create a topic
-       Create(utils.TopicName, int) error
-
-       // Delete a topic
-       Delete(utils.TopicName, bool, bool) error
+       // Create a partitioned or non-partitioned topic
+       //
+       // @param topic
+       //        topicName struct
+       // @param partitions
+       //        number of topic partitions,
+       //        when setting to 0, it will create a non-partitioned topic
+       Create(topic utils.TopicName, partitions int) error
+
+       // Delete a topic, this function can delete both partitioned or 
non-partitioned topic
+       //
+       // @param topic
+       //        topicName struct
+       // @param force
+       //        delete topic forcefully
+       // @param nonPartitioned
+       //        when set to true, topic will be treated as a non-partitioned 
topic
+       //        Otherwise it will be treated as a partitioned topic
+       Delete(topic utils.TopicName, force bool, nonPartitioned bool) error
 
        // Update number of partitions of a non-global partitioned topic
        // It requires partitioned-topic to be already exist and number of new 
partitions must be greater than existing
        // number of partitions. Decrementing number of partitions requires 
deletion of topic which is not supported.
-       Update(utils.TopicName, int) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param partitions
+       //        number of new partitions of already exist partitioned-topic
+       Update(topic utils.TopicName, partitions int) error
 
        // GetMetadata returns metadata of a partitioned topic
        GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
@@ -52,12 +71,24 @@ type Topics interface {
        GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)
 
        // GrantPermission grants a new permission to a client role on a single 
topic
-       GrantPermission(utils.TopicName, string, []utils.AuthAction) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param role
+       //        client role to which grant permission
+       // @param action
+       //        auth actions (e.g. produce and consume)
+       GrantPermission(topic utils.TopicName, role string, action 
[]utils.AuthAction) error
 
        // RevokePermission revokes permissions to a client role on a single 
topic. If the permission
        // was not set at the topic level, but rather at the namespace level, 
this operation will
        // return an error (HTTP status code 412).
-       RevokePermission(utils.TopicName, string) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param role
+       //        client role to which remove permissions
+       RevokePermission(topic utils.TopicName, role string) error
 
        // Lookup a topic returns the broker URL that serves the topic
        Lookup(utils.TopicName) (utils.LookupData, error)
@@ -69,24 +100,56 @@ type Topics interface {
        GetLastMessageID(utils.TopicName) (utils.MessageID, error)
 
        // GetMessageID returns the message Id by timestamp(ms) of a topic
-       GetMessageID(utils.TopicName, int64) (utils.MessageID, error)
-
-       // GetStats returns the stats for the topic
-       // All the rates are computed over a 1 minute window and are relative 
the last completed 1 minute period
+       //
+       // @param topic
+       //        topicName struct
+       // @param timestamp
+       //        absolute timestamp (in ms)
+       GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, 
error)
+
+       // GetStats returns the stats for the topic.
+       //
+       // All the rates are computed over a 1-minute window and are relative 
the last completed 1-minute period
        GetStats(utils.TopicName) (utils.TopicStats, error)
 
        // GetStatsWithOption returns the stats for the topic
-       GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) 
(utils.TopicStats, error)
+       //
+       // All the rates are computed over a 1-minute window and are relative 
the last completed 1-minute period
+       //
+       // @param topic
+       //        topicName struct
+       // @param option
+       //        request option, e.g. get_precise_backlog or 
subscription_backlog_size
+       GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) 
(utils.TopicStats, error)
 
        // GetInternalStats returns the internal stats for the topic.
        GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, 
error)
 
        // GetPartitionedStats returns the stats for the partitioned topic
-       // All the rates are computed over a 1 minute window and are relative 
the last completed 1 minute period
-       GetPartitionedStats(utils.TopicName, bool) 
(utils.PartitionedTopicStats, error)
+       //
+       // All the rates are computed over a 1-minute window and are relative 
the last completed 1-minute period
+       //
+       // @param topic
+       //        topicName struct
+       // @param perPartition
+       //        flag to get stats per partition
+       GetPartitionedStats(topic utils.TopicName, perPartition bool) 
(utils.PartitionedTopicStats, error)
 
        // GetPartitionedStatsWithOption returns the stats for the partitioned 
topic
-       GetPartitionedStatsWithOption(utils.TopicName, bool, 
utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
+       //
+       // All the rates are computed over a 1-minute window and are relative 
the last completed 1-minute period
+       //
+       // @param topic
+       //        topicName struct
+       // @param perPartition
+       //        flag to get stats per partition
+       // @param option
+       //        request option, e.g. get_precise_backlog or 
subscription_backlog_size
+       GetPartitionedStatsWithOption(
+               topic utils.TopicName,
+               perPartition bool,
+               option utils.GetStatsOptions,
+       ) (utils.PartitionedTopicStats, error)
 
        // Terminate the topic and prevent any more messages being published on 
it
        Terminate(utils.TopicName) (utils.MessageID, error)
@@ -111,7 +174,12 @@ type Topics interface {
        GetMessageTTL(utils.TopicName) (int, error)
 
        // SetMessageTTL Set the message TTL for a topic
-       SetMessageTTL(utils.TopicName, int) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param messageTTL
+       //        Message TTL in second
+       SetMessageTTL(topic utils.TopicName, messageTTL int) error
 
        // RemoveMessageTTL Remove the message TTL for a topic
        RemoveMessageTTL(utils.TopicName) error
@@ -120,7 +188,12 @@ type Topics interface {
        GetMaxProducers(utils.TopicName) (int, error)
 
        // SetMaxProducers Set max number of producers for a topic
-       SetMaxProducers(utils.TopicName, int) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param maxProducers
+       //        max number of producer
+       SetMaxProducers(topic utils.TopicName, maxProducers int) error
 
        // RemoveMaxProducers Remove max number of producers for a topic
        RemoveMaxProducers(utils.TopicName) error
@@ -129,7 +202,12 @@ type Topics interface {
        GetMaxConsumers(utils.TopicName) (int, error)
 
        // SetMaxConsumers Set max number of consumers for a topic
-       SetMaxConsumers(utils.TopicName, int) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param maxConsumers
+       //        max number of consumer
+       SetMaxConsumers(topic utils.TopicName, maxConsumers int) error
 
        // RemoveMaxConsumers Remove max number of consumers for a topic
        RemoveMaxConsumers(utils.TopicName) error
@@ -138,7 +216,12 @@ type Topics interface {
        GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)
 
        // SetMaxUnackMessagesPerConsumer Set max unacked messages policy on 
consumer for a topic
-       SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param maxUnackedNum
+       //        max unAcked messages on each consumer
+       SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum 
int) error
 
        // RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy 
on consumer for a topic
        RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
@@ -147,7 +230,12 @@ type Topics interface {
        GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)
 
        // SetMaxUnackMessagesPerSubscription Set max unacked messages policy 
on subscription for a topic
-       SetMaxUnackMessagesPerSubscription(utils.TopicName, int) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param maxUnackedNum
+       //        max unAcked messages on subscription of a topic
+       SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum 
int) error
 
        // RemoveMaxUnackMessagesPerSubscription Remove max unacked messages 
policy on subscription for a topic
        RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
@@ -192,13 +280,24 @@ type Topics interface {
        GetDeduplicationStatus(utils.TopicName) (bool, error)
 
        // SetDeduplicationStatus Set the deduplication policy for a topic
-       SetDeduplicationStatus(utils.TopicName, bool) error
+       //
+       // @param topic
+       //        topicName struct
+       // @param enabled
+       //        set enable or disable deduplication of the topic
+       SetDeduplicationStatus(topic utils.TopicName, enabled bool) error
 
        // RemoveDeduplicationStatus Remove the deduplication policy for a topic
        RemoveDeduplicationStatus(utils.TopicName) error
 
        // GetRetention returns the retention configuration for a topic
-       GetRetention(utils.TopicName, bool) (*utils.RetentionPolicies, error)
+       //
+       // @param topic
+       //        topicName struct
+       // @param applied
+       //        when set to true, function will try to find policy applied to 
this topic
+       //        in namespace or broker level, if no policy set in topic level
+       GetRetention(topic utils.TopicName, applied bool) 
(*utils.RetentionPolicies, error)
 
        // RemoveRetention removes the retention configuration on a topic
        RemoveRetention(utils.TopicName) error
@@ -206,16 +305,35 @@ type Topics interface {
        // SetRetention sets the retention policy for a topic
        SetRetention(utils.TopicName, utils.RetentionPolicies) error
 
-       // Get the compaction threshold for a topic
+       // GetCompactionThreshold Get the compaction threshold for a topic.
+       //
+       // i.e. The maximum number of bytes can have before compaction is 
triggered.
+       //
+       // @param topic
+       //        topicName struct
+       // @param applied
+       //        when set to true, function will try to find policy applied to 
this topic
+       //        in namespace or broker level, if no policy set in topic level
        GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, 
error)
 
-       // Set the compaction threshold for a topic
+       // SetCompactionThreshold Set the compaction threshold for a topic
+       //
+       // @param topic
+       //        topicName struct
+       // @param threshold
+       //        maximum number of backlog bytes before compaction is triggered
        SetCompactionThreshold(topic utils.TopicName, threshold int64) error
 
        // Remove compaction threshold for a topic
        RemoveCompactionThreshold(utils.TopicName) error
 
        // GetBacklogQuotaMap returns backlog quota map for a topic
+       //
+       // @param topic
+       //        topicName struct
+       // @param applied
+       //        when set to true, function will try to find policy applied to 
this topic
+       //        in namespace or broker level, if no policy set in topic level
        GetBacklogQuotaMap(topic utils.TopicName, applied bool) 
(map[utils.BacklogQuotaType]utils.BacklogQuota, error)
 
        // SetBacklogQuota sets a backlog quota for a topic
@@ -225,6 +343,12 @@ type Topics interface {
        RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error
 
        // GetInactiveTopicPolicies gets the inactive topic policies on a topic
+       //
+       // @param topic
+       //        topicName struct
+       // @param applied
+       //        when set to true, function will try to find policy applied to 
this topic
+       //        in namespace or broker level, if no policy set in topic level
        GetInactiveTopicPolicies(topic utils.TopicName, applied bool) 
(utils.InactiveTopicPolicies, error)
 
        // RemoveInactiveTopicPolicies removes inactive topic policies from a 
topic
@@ -237,6 +361,11 @@ type Topics interface {
        GetReplicationClusters(topic utils.TopicName) ([]string, error)
 
        // SetReplicationClusters sets the replication clusters on a topic
+       //
+       // @param topic
+       //        topicName struct
+       // @param data
+       //        list of replication cluster id
        SetReplicationClusters(topic utils.TopicName, data []string) error
 }
 
diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index a9b1f002..a3609ff6 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -213,3 +213,230 @@ func TestNonPartitionState(t *testing.T) {
 func newTopicName() string {
        return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
 }
+
+func TestDeleteNonPartitionedTopic(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 0)
+       assert.NoError(t, err)
+       err = admin.Topics().Delete(*topicName, false, true)
+       assert.NoError(t, err)
+       topicList, err := admin.Namespaces().GetTopics("public/default")
+       assert.NoError(t, err)
+       isTopicExist := false
+       for _, topicIterator := range topicList {
+               if topicIterator == topic {
+                       isTopicExist = true
+               }
+       }
+       assert.Equal(t, false, isTopicExist)
+}
+
+func TestDeletePartitionedTopic(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 3)
+       assert.NoError(t, err)
+       err = admin.Topics().Delete(*topicName, false, false)
+       assert.NoError(t, err)
+       topicList, err := admin.Namespaces().GetTopics("public/default")
+       assert.NoError(t, err)
+       isTopicExist := false
+       for _, topicIterator := range topicList {
+               if topicIterator == topic {
+                       isTopicExist = true
+               }
+       }
+       assert.Equal(t, false, isTopicExist)
+}
+
+func TestUpdateTopicPartitions(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 3)
+       assert.NoError(t, err)
+       topicMetadata, err := admin.Topics().GetMetadata(*topicName)
+       assert.NoError(t, err)
+       assert.Equal(t, 3, topicMetadata.Partitions)
+
+       err = admin.Topics().Update(*topicName, 4)
+       assert.NoError(t, err)
+       topicMetadata, err = admin.Topics().GetMetadata(*topicName)
+       assert.NoError(t, err)
+       assert.Equal(t, 4, topicMetadata.Partitions)
+}
+
+func TestGetMessageID(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       topicPartitionZero := topic + "-partition-0"
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       topicPartitionZeroName, err := utils.GetTopicName(topicPartitionZero)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 1)
+       assert.NoError(t, err)
+       ctx := context.Background()
+
+       // create consumer
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+       consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+               Type:             pulsar.Exclusive,
+       })
+       assert.NoError(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.NoError(t, err)
+       defer producer.Close()
+       _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+               Payload: []byte("hello"),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })
+       assert.NoError(t, err)
+
+       // ack message
+       msg, err := consumer.Receive(ctx)
+       assert.NoError(t, err)
+       assert.Equal(t, []byte("hello"), msg.Payload())
+       assert.Equal(t, "pulsar", msg.Key())
+       err = consumer.Ack(msg)
+       assert.NoError(t, err)
+
+       messageID, err := admin.Topics().GetMessageID(
+               *topicPartitionZeroName,
+               msg.PublishTime().Unix()*1000-1000,
+       )
+       assert.NoError(t, err)
+       assert.Equal(t, msg.ID().EntryID(), messageID.EntryID)
+       assert.Equal(t, msg.ID().LedgerID(), messageID.LedgerID)
+       assert.Equal(t, int(msg.ID().PartitionIdx()), messageID.PartitionIndex)
+}
+
+func TestMessageTTL(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       messageTTL, err := admin.Topics().GetMessageTTL(*topicName)
+       assert.NoError(t, err)
+       assert.Equal(t, 0, messageTTL)
+       err = admin.Topics().SetMessageTTL(*topicName, 600)
+       assert.NoError(t, err)
+       //      topic policy is an async operation,
+       //      so we need to wait for a while to get current value
+       assert.Eventually(
+               t,
+               func() bool {
+                       messageTTL, err = 
admin.Topics().GetMessageTTL(*topicName)
+                       return err == nil && messageTTL == 600
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+       err = admin.Topics().RemoveMessageTTL(*topicName)
+       assert.NoError(t, err)
+       assert.Eventually(
+               t,
+               func() bool {
+                       messageTTL, err = 
admin.Topics().GetMessageTTL(*topicName)
+                       return err == nil && messageTTL == 0
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+}
+
+func TestRetention(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       topicRetentionPolicy, err := admin.Topics().GetRetention(*topicName, 
false)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(0), topicRetentionPolicy.RetentionSizeInMB)
+       assert.Equal(t, 0, topicRetentionPolicy.RetentionTimeInMinutes)
+       err = admin.Topics().SetRetention(*topicName, utils.RetentionPolicies{
+               RetentionSizeInMB:      20480,
+               RetentionTimeInMinutes: 1440,
+       })
+       assert.NoError(t, err)
+       //      topic policy is an async operation,
+       //      so we need to wait for a while to get current value
+       assert.Eventually(
+               t,
+               func() bool {
+                       topicRetentionPolicy, err = 
admin.Topics().GetRetention(*topicName, false)
+                       return err == nil &&
+                               topicRetentionPolicy.RetentionSizeInMB == 
int64(20480) &&
+                               topicRetentionPolicy.RetentionTimeInMinutes == 
1440
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+       err = admin.Topics().RemoveRetention(*topicName)
+       assert.NoError(t, err)
+       assert.Eventually(
+               t,
+               func() bool {
+                       topicRetentionPolicy, err = 
admin.Topics().GetRetention(*topicName, false)
+                       return err == nil &&
+                               topicRetentionPolicy.RetentionSizeInMB == 
int64(0) &&
+                               topicRetentionPolicy.RetentionTimeInMinutes == 0
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+}

Reply via email to