This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c0db4828 Use -1 as sentinel value for namespace and topic admin 
commands (#1430)
c0db4828 is described below

commit c0db48286e9e50f47fb0f6e9d5187dfe945fc5e9
Author: Kai <[email protected]>
AuthorDate: Thu Oct 23 00:47:57 2025 -0700

    Use -1 as sentinel value for namespace and topic admin commands (#1430)
    
    Fixes #1429
    
    ### Motivation
    
    Use -1 as a sentinel value for all the namespace / topic admin get commands 
that return an empty body to mean "unset"
    
    ### Modifications
    
    -1 is the default return for the various get commands:
    
    ##### Namespace Admin Methods
    
        - GetNamespaceMessageTTL / GetNamespaceMessageTTLWithContext
        - GetMaxConsumersPerTopic / GetMaxConsumersPerTopicWithContext
        - GetMaxProducersPerTopic / GetMaxProducersPerTopicWithContext
        - GetMaxConsumersPerSubscription / 
GetMaxConsumersPerSubscriptionWithContext
        - GetOffloadThreshold / GetOffloadThresholdWithContext
        - GetOffloadThresholdInSeconds / GetOffloadThresholdInSecondsWithContext
        - GetOffloadDeleteLag / GetOffloadDeleteLagWithContext
        - GetCompactionThreshold / GetCompactionThresholdWithContext
    
    ##### Topic Admin Methods
    
        - GetMessageTTL / GetMessageTTLWithContext
        - GetMaxProducers / GetMaxProducersWithContext
        - GetMaxConsumers / GetMaxConsumersWithContext
        - GetMaxUnackMessagesPerConsumer / 
GetMaxUnackMessagesPerConsumerWithContext
        - GetMaxUnackMessagesPerSubscription / 
GetMaxUnackMessagesPerSubscriptionWithContext
        - GetCompactionThreshold / GetCompactionThresholdWithContext
        - GetMaxConsumersPerSubscription / 
GetMaxConsumersPerSubscriptionWithContext
        - GetMaxMessageSize / GetMaxMessageSizeWithContext
        - GetMaxSubscriptionsPerTopic / GetMaxSubscriptionsPerTopicWithContext
        - GetDeduplicationSnapshotInterval / 
GetDeduplicationSnapshotIntervalWithContext
---
 pulsaradmin/pkg/admin/namespace.go      |  50 +++----
 pulsaradmin/pkg/admin/namespace_test.go | 166 ++++++++++++++++++++++-
 pulsaradmin/pkg/admin/topic.go          |  64 ++++-----
 pulsaradmin/pkg/admin/topic_test.go     | 228 ++++++++++++++++++++++++++++++--
 4 files changed, 443 insertions(+), 65 deletions(-)

diff --git a/pulsaradmin/pkg/admin/namespace.go 
b/pulsaradmin/pkg/admin/namespace.go
index fdaa309b..6114f5ce 100644
--- a/pulsaradmin/pkg/admin/namespace.go
+++ b/pulsaradmin/pkg/admin/namespace.go
@@ -88,10 +88,10 @@ type Namespaces interface {
        // SetNamespaceMessageTTLWithContext sets the messages Time to Live for 
all the topics within a namespace
        SetNamespaceMessageTTLWithContext(ctx context.Context, namespace 
string, ttlInSeconds int) error
 
-       // GetNamespaceMessageTTL returns the message TTL for a namespace
+       // GetNamespaceMessageTTL returns the message TTL for a namespace. 
Returns -1 if not set
        GetNamespaceMessageTTL(namespace string) (int, error)
 
-       // GetNamespaceMessageTTLWithContext returns the message TTL for a 
namespace
+       // GetNamespaceMessageTTLWithContext returns the message TTL for a 
namespace. Returns -1 if not set
        GetNamespaceMessageTTLWithContext(ctx context.Context, namespace 
string) (int, error)
 
        // GetRetention returns the retention configuration for a namespace
@@ -226,10 +226,11 @@ type Namespaces interface {
        // SetOffloadDeleteLagWithContext sets the offload deletion lag for a 
namespace
        SetOffloadDeleteLagWithContext(ctx context.Context, namespace 
utils.NameSpaceName, timeMs int64) error
 
-       // GetOffloadDeleteLag returns the offload deletion lag for a 
namespace, in milliseconds
+       // GetOffloadDeleteLag returns the offload deletion lag for a 
namespace, in milliseconds. Returns -1 if not set
        GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64, error)
 
-       // GetOffloadDeleteLagWithContext returns the offload deletion lag for 
a namespace, in milliseconds
+       // GetOffloadDeleteLagWithContext returns the offload deletion lag for 
a namespace, in milliseconds.
+       // Returns -1 if not set
        GetOffloadDeleteLagWithContext(ctx context.Context, namespace 
utils.NameSpaceName) (int64, error)
 
        // SetOffloadThreshold sets the offloadThreshold for a namespace
@@ -238,10 +239,10 @@ type Namespaces interface {
        // SetOffloadThresholdWithContext sets the offloadThreshold for a 
namespace
        SetOffloadThresholdWithContext(ctx context.Context, namespace 
utils.NameSpaceName, threshold int64) error
 
-       // GetOffloadThreshold returns the offloadThreshold for a namespace
+       // GetOffloadThreshold returns the offloadThreshold for a namespace. 
Returns -1 if not set
        GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error)
 
-       // GetOffloadThresholdWithContext returns the offloadThreshold for a 
namespace
+       // GetOffloadThresholdWithContext returns the offloadThreshold for a 
namespace. Returns -1 if not set
        GetOffloadThresholdWithContext(ctx context.Context, namespace 
utils.NameSpaceName) (int64, error)
 
        // SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for 
a namespace
@@ -250,10 +251,10 @@ type Namespaces interface {
        // SetOffloadThresholdInSecondsWithContext sets the 
offloadThresholdInSeconds for a namespace
        SetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace 
utils.NameSpaceName, threshold int64) error
 
-       // GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds 
for a namespace
+       // GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds 
for a namespace. Returns -1 if not set
        GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, 
error)
 
-       // GetOffloadThresholdInSecondsWithContext returns the 
offloadThresholdInSeconds for a namespace
+       // GetOffloadThresholdInSecondsWithContext returns the 
offloadThresholdInSeconds for a namespace. Returns -1 if not set
        GetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace 
utils.NameSpaceName) (int64, error)
 
        // SetCompactionThreshold sets the compactionThreshold for a namespace
@@ -262,10 +263,10 @@ type Namespaces interface {
        // SetCompactionThresholdWithContext sets the compactionThreshold for a 
namespace
        SetCompactionThresholdWithContext(ctx context.Context, namespace 
utils.NameSpaceName, threshold int64) error
 
-       // GetCompactionThreshold returns the compactionThreshold for a 
namespace
+       // GetCompactionThreshold returns the compactionThreshold for a 
namespace. Returns -1 if not set
        GetCompactionThreshold(namespace utils.NameSpaceName) (int64, error)
 
-       // GetCompactionThresholdWithContext returns the compactionThreshold 
for a namespace
+       // GetCompactionThresholdWithContext returns the compactionThreshold 
for a namespace. Returns -1 if not set
        GetCompactionThresholdWithContext(ctx context.Context, namespace 
utils.NameSpaceName) (int64, error)
 
        // SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for 
a namespace.
@@ -276,10 +277,11 @@ type Namespaces interface {
        //nolint: revive // It's ok here to use a built-in function name (max)
        SetMaxConsumersPerSubscriptionWithContext(ctx context.Context, 
namespace utils.NameSpaceName, max int) error
 
-       // GetMaxConsumersPerSubscription returns the 
maxConsumersPerSubscription for a namespace.
+       // GetMaxConsumersPerSubscription returns the 
maxConsumersPerSubscription for a namespace. Returns -1 if not set
        GetMaxConsumersPerSubscription(namespace utils.NameSpaceName) (int, 
error)
 
        // GetMaxConsumersPerSubscriptionWithContext returns the 
maxConsumersPerSubscription for a namespace.
+       // Returns -1 if not set
        GetMaxConsumersPerSubscriptionWithContext(ctx context.Context, 
namespace utils.NameSpaceName) (int, error)
 
        // SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace.
@@ -290,10 +292,10 @@ type Namespaces interface {
        //nolint: revive // It's ok here to use a built-in function name (max)
        SetMaxConsumersPerTopicWithContext(ctx context.Context, namespace 
utils.NameSpaceName, max int) error
 
-       // GetMaxConsumersPerTopic returns the maxProducersPerTopic for a 
namespace.
+       // GetMaxConsumersPerTopic returns the maxProducersPerTopic for a 
namespace. Returns -1 if not set
        GetMaxConsumersPerTopic(namespace utils.NameSpaceName) (int, error)
 
-       // GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic 
for a namespace.
+       // GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic 
for a namespace. Returns -1 if not set
        GetMaxConsumersPerTopicWithContext(ctx context.Context, namespace 
utils.NameSpaceName) (int, error)
 
        // SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace.
@@ -304,10 +306,10 @@ type Namespaces interface {
        //nolint: revive // It's ok here to use a built-in function name (max)
        SetMaxProducersPerTopicWithContext(ctx context.Context, namespace 
utils.NameSpaceName, max int) error
 
-       // GetMaxProducersPerTopic returns the maxProducersPerTopic for a 
namespace.
+       // GetMaxProducersPerTopic returns the maxProducersPerTopic for a 
namespace. Returns -1 if not set
        GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error)
 
-       // GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic 
for a namespace.
+       // GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic 
for a namespace. Returns -1 if not set
        GetMaxProducersPerTopicWithContext(ctx context.Context, namespace 
utils.NameSpaceName) (int, error)
 
        // SetMaxTopicsPerNamespace sets maxTopicsPerNamespace for a namespace.
@@ -851,7 +853,7 @@ func (n *namespaces) GetNamespaceMessageTTL(namespace 
string) (int, error) {
 }
 
 func (n *namespaces) GetNamespaceMessageTTLWithContext(ctx context.Context, 
namespace string) (int, error) {
-       var ttl int
+       var ttl = -1
        nsName, err := utils.GetNamespaceName(namespace)
        if err != nil {
                return 0, err
@@ -1114,7 +1116,7 @@ func (n *namespaces) GetOffloadDeleteLag(namespace 
utils.NameSpaceName) (int64,
 }
 
 func (n *namespaces) GetOffloadDeleteLagWithContext(ctx context.Context, 
namespace utils.NameSpaceName) (int64, error) {
-       var result int64
+       var result int64 = -1
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"offloadDeletionLagMs")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
@@ -1143,7 +1145,7 @@ func (n *namespaces) 
GetMaxConsumersPerSubscriptionWithContext(
        ctx context.Context,
        namespace utils.NameSpaceName,
 ) (int, error) {
-       var result int
+       var result = -1
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"maxConsumersPerSubscription")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
@@ -1167,7 +1169,7 @@ func (n *namespaces) GetOffloadThreshold(namespace 
utils.NameSpaceName) (int64,
 }
 
 func (n *namespaces) GetOffloadThresholdWithContext(ctx context.Context, 
namespace utils.NameSpaceName) (int64, error) {
-       var result int64
+       var result int64 = -1
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"offloadThreshold")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
@@ -1194,7 +1196,7 @@ func (n *namespaces) 
GetOffloadThresholdInSecondsWithContext(
        ctx context.Context,
        namespace utils.NameSpaceName,
 ) (int64, error) {
-       var result int64
+       var result int64 = -1
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"offloadThresholdInSeconds")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
@@ -1223,7 +1225,7 @@ func (n *namespaces) GetMaxConsumersPerTopicWithContext(
        ctx context.Context,
        namespace utils.NameSpaceName,
 ) (int, error) {
-       var result int
+       var result = -1
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"maxConsumersPerTopic")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
@@ -1250,7 +1252,7 @@ func (n *namespaces) GetCompactionThresholdWithContext(
        ctx context.Context,
        namespace utils.NameSpaceName,
 ) (int64, error) {
-       var result int64
+       var result int64 = -1
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"compactionThreshold")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
@@ -1279,7 +1281,7 @@ func (n *namespaces) GetMaxProducersPerTopicWithContext(
        ctx context.Context,
        namespace utils.NameSpaceName,
 ) (int, error) {
-       var result int
+       var result = -1
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"maxProducersPerTopic")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
@@ -2014,7 +2016,7 @@ func (n *namespaces) GetMaxTopicsPerNamespaceWithContext(
        ctx context.Context,
        namespace utils.NameSpaceName,
 ) (int, error) {
-       var result int
+       var result int // This method does not require a sentinel value of -1 
since the API never returns empty
        endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), 
"maxTopicsPerNamespace")
        err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
        return result, err
diff --git a/pulsaradmin/pkg/admin/namespace_test.go 
b/pulsaradmin/pkg/admin/namespace_test.go
index 6600cfaf..cfa44951 100644
--- a/pulsaradmin/pkg/admin/namespace_test.go
+++ b/pulsaradmin/pkg/admin/namespace_test.go
@@ -333,7 +333,11 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t 
*testing.T) {
 
        namespace, _ := utils.GetNamespaceName("public/default")
 
-       // set the subscription expiration time and get it
+       // Get default (should be -1)
+       threshold, err := admin.Namespaces().GetOffloadThreshold(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(-1), threshold)
+
        err = admin.Namespaces().SetOffloadThresholdInSeconds(*namespace,
                60)
        assert.Equal(t, nil, err)
@@ -599,3 +603,163 @@ func TestNamespaces_GetMaxTopicsPerNamespace(t 
*testing.T) {
        expected = 0
        assert.Equal(t, expected, maxTopics)
 }
+
+func TestNamespaces_MessageTTL(t *testing.T) {
+       config := &config.Config{}
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespace, _ := utils.GetNamespaceName("public/default")
+
+       // Get default (should be -1)
+       ttl, err := 
admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
+       assert.NoError(t, err)
+       assert.Equal(t, -1, ttl)
+
+       // Set to 0 explicitly
+       err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
+       assert.NoError(t, err)
+       assert.Equal(t, 0, ttl)
+
+       // Set to positive value
+       err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 
3600)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
+       assert.NoError(t, err)
+       assert.Equal(t, 3600, ttl)
+}
+
+func TestNamespaces_OffloadDeleteLag(t *testing.T) {
+       config := &config.Config{}
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespace, _ := utils.GetNamespaceName("public/default")
+
+       // Get default (should be -1)
+       lag, err := admin.Namespaces().GetOffloadDeleteLag(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(-1), lag)
+
+       // Set to 0 explicitly
+       err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(0), lag)
+
+       // Set to positive value
+       err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 1000)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(1000), lag)
+}
+
+func TestNamespaces_MaxConsumersPerTopic(t *testing.T) {
+       config := &config.Config{}
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespace, _ := utils.GetNamespaceName("public/default")
+
+       // Get default (should be -1)
+       maxConsumers, err := 
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, -1, maxConsumers)
+
+       // Set to 0 explicitly
+       err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       maxConsumers, err = 
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, 0, maxConsumers)
+
+       // Set to positive value
+       err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 100)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       maxConsumers, err = 
admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, 100, maxConsumers)
+}
+
+func TestNamespaces_CompactionThreshold(t *testing.T) {
+       config := &config.Config{}
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespace, _ := utils.GetNamespaceName("public/default")
+
+       // Get default (should be -1)
+       threshold, err := admin.Namespaces().GetCompactionThreshold(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(-1), threshold)
+
+       // Set to 0 explicitly
+       err = admin.Namespaces().SetCompactionThreshold(*namespace, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(0), threshold)
+
+       // Set to positive value
+       err = admin.Namespaces().SetCompactionThreshold(*namespace, 1024*1024) 
// 1MB
+       assert.NoError(t, err)
+
+       // Verify returns value
+       threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(1024*1024), threshold)
+}
+
+func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
+       config := &config.Config{}
+       admin, err := New(config)
+       require.NoError(t, err)
+       require.NotNil(t, admin)
+
+       namespace, _ := utils.GetNamespaceName("public/default")
+
+       // Get default (should be -1)
+       maxProducers, err := 
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, -1, maxProducers)
+
+       // Set to 0 explicitly
+       err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       maxProducers, err = 
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, 0, maxProducers)
+
+       // Set to positive value
+       err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 50)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       maxProducers, err = 
admin.Namespaces().GetMaxProducersPerTopic(*namespace)
+       assert.NoError(t, err)
+       assert.Equal(t, 50, maxProducers)
+}
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index 2e9e155f..37bbe94a 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -390,10 +390,10 @@ type Topics interface {
        // CompactStatusWithContext checks the status of an ongoing compaction 
for a topic
        CompactStatusWithContext(context.Context, utils.TopicName) 
(utils.LongRunningProcessStatus, error)
 
-       // GetMessageTTL returns the message TTL for a topic
+       // GetMessageTTL returns the message TTL for a topic. Returns -1 if not 
set
        GetMessageTTL(utils.TopicName) (int, error)
 
-       // GetMessageTTLWithContext returns the message TTL for a topic
+       // GetMessageTTLWithContext returns the message TTL for a topic. 
Returns -1 if not set
        GetMessageTTLWithContext(context.Context, utils.TopicName) (int, error)
 
        // SetMessageTTL sets the message TTL for a topic
@@ -420,10 +420,10 @@ type Topics interface {
        // RemoveMessageTTLWithContext removes the message TTL for a topic
        RemoveMessageTTLWithContext(context.Context, utils.TopicName) error
 
-       // GetMaxProducers Get max number of producers for a topic
+       // GetMaxProducers Get max number of producers for a topic. Returns -1 
if not set
        GetMaxProducers(utils.TopicName) (int, error)
 
-       // GetMaxProducersWithContext Get max number of producers for a topic
+       // GetMaxProducersWithContext Get max number of producers for a topic. 
Returns -1 if not set
        GetMaxProducersWithContext(context.Context, utils.TopicName) (int, 
error)
 
        // SetMaxProducers sets max number of producers for a topic
@@ -450,10 +450,10 @@ type Topics interface {
        // RemoveMaxProducersWithContext removes max number of producers for a 
topic
        RemoveMaxProducersWithContext(context.Context, utils.TopicName) error
 
-       // GetMaxConsumers returns max number of consumers for a topic
+       // GetMaxConsumers returns max number of consumers for a topic. Returns 
-1 if not set
        GetMaxConsumers(utils.TopicName) (int, error)
 
-       // GetMaxConsumersWithContext returns max number of consumers for a 
topic
+       // GetMaxConsumersWithContext returns max number of consumers for a 
topic. Returns -1 if not set
        GetMaxConsumersWithContext(context.Context, utils.TopicName) (int, 
error)
 
        // SetMaxConsumers sets max number of consumers for a topic
@@ -480,10 +480,11 @@ type Topics interface {
        // RemoveMaxConsumersWithContext removes max number of consumers for a 
topic
        RemoveMaxConsumersWithContext(context.Context, utils.TopicName) error
 
-       // GetMaxUnackMessagesPerConsumer returns max unacked messages policy 
on consumer for a topic
+       // GetMaxUnackMessagesPerConsumer returns max unacked messages policy 
on consumer for a topic. Returns -1 if not set
        GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)
 
-       // GetMaxUnackMessagesPerConsumerWithContext returns max unacked 
messages policy on consumer for a topic
+       // GetMaxUnackMessagesPerConsumerWithContext returns max unacked 
messages policy on consumer for a topic.
+       // Returns -1 if not set
        GetMaxUnackMessagesPerConsumerWithContext(context.Context, 
utils.TopicName) (int, error)
 
        // SetMaxUnackMessagesPerConsumer sets max unacked messages policy on 
consumer for a topic
@@ -510,10 +511,12 @@ type Topics interface {
        // RemoveMaxUnackMessagesPerConsumerWithContext removes max unacked 
messages policy on consumer for a topic
        RemoveMaxUnackMessagesPerConsumerWithContext(context.Context, 
utils.TopicName) error
 
-       // GetMaxUnackMessagesPerSubscription returns max unacked messages 
policy on subscription for a topic
+       // GetMaxUnackMessagesPerSubscription returns max unacked messages 
policy on subscription for a topic.
+       // Returns -1 if not set
        GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)
 
-       // GetMaxUnackMessagesPerSubscriptionWithContext returns max unacked 
messages policy on subscription for a topic
+       // GetMaxUnackMessagesPerSubscriptionWithContext returns max unacked 
messages policy on subscription for a topic.
+       // Returns -1 if not set
        GetMaxUnackMessagesPerSubscriptionWithContext(context.Context, 
utils.TopicName) (int, error)
 
        // SetMaxUnackMessagesPerSubscription sets max unacked messages policy 
on subscription for a topic
@@ -674,7 +677,7 @@ type Topics interface {
        // SetRetentionWithContext sets the retention policy for a topic
        SetRetentionWithContext(context.Context, utils.TopicName, 
utils.RetentionPolicies) error
 
-       // GetCompactionThreshold returns the compaction threshold for a topic.
+       // GetCompactionThreshold returns the compaction threshold for a topic. 
Returns -1 if not set
        //
        // i.e. The maximum number of bytes can have before compaction is 
triggered.
        //
@@ -685,7 +688,7 @@ type Topics interface {
        //        in namespace or broker level, if no policy set in topic level
        GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, 
error)
 
-       // GetCompactionThresholdWithContext returns the compaction threshold 
for a topic.
+       // GetCompactionThresholdWithContext returns the compaction threshold 
for a topic. Returns -1 if not set
        //
        // i.e. The maximum number of bytes can have before compaction is 
triggered.
        //
@@ -854,10 +857,10 @@ type Topics interface {
        // RemoveSubscriptionDispatchRateWithContext removes subscription 
dispatch rate for a topic
        RemoveSubscriptionDispatchRateWithContext(context.Context, 
utils.TopicName) error
 
-       // GetMaxConsumersPerSubscription returns max consumers per 
subscription for a topic
+       // GetMaxConsumersPerSubscription returns max consumers per 
subscription for a topic. Returns -1 if not set
        GetMaxConsumersPerSubscription(utils.TopicName) (int, error)
 
-       // GetMaxConsumersPerSubscriptionWithContext returns max consumers per 
subscription for a topic
+       // GetMaxConsumersPerSubscriptionWithContext returns max consumers per 
subscription for a topic. Returns -1 if not set
        GetMaxConsumersPerSubscriptionWithContext(context.Context, 
utils.TopicName) (int, error)
 
        // SetMaxConsumersPerSubscription sets max consumers per subscription 
for a topic
@@ -872,10 +875,10 @@ type Topics interface {
        // RemoveMaxConsumersPerSubscriptionWithContext removes max consumers 
per subscription for a topic
        RemoveMaxConsumersPerSubscriptionWithContext(context.Context, 
utils.TopicName) error
 
-       // GetMaxMessageSize returns max message size for a topic
+       // GetMaxMessageSize returns max message size for a topic. Returns -1 
if not set
        GetMaxMessageSize(utils.TopicName) (int, error)
 
-       // GetMaxMessageSizeWithContext returns max message size for a topic
+       // GetMaxMessageSizeWithContext returns max message size for a topic. 
Returns -1 if not set
        GetMaxMessageSizeWithContext(context.Context, utils.TopicName) (int, 
error)
 
        // SetMaxMessageSize sets max message size for a topic
@@ -890,10 +893,10 @@ type Topics interface {
        // RemoveMaxMessageSizeWithContext removes max message size for a topic
        RemoveMaxMessageSizeWithContext(context.Context, utils.TopicName) error
 
-       // GetMaxSubscriptionsPerTopic returns max subscriptions per topic
+       // GetMaxSubscriptionsPerTopic returns max subscriptions per topic. 
Returns -1 if not set
        GetMaxSubscriptionsPerTopic(utils.TopicName) (int, error)
 
-       // GetMaxSubscriptionsPerTopicWithContext returns max subscriptions per 
topic
+       // GetMaxSubscriptionsPerTopicWithContext returns max subscriptions per 
topic. Returns -1 if not set
        GetMaxSubscriptionsPerTopicWithContext(context.Context, 
utils.TopicName) (int, error)
 
        // SetMaxSubscriptionsPerTopic sets max subscriptions per topic
@@ -926,10 +929,11 @@ type Topics interface {
        // RemoveSchemaValidationEnforcedWithContext removes schema validation 
enforced flag for a topic
        RemoveSchemaValidationEnforcedWithContext(context.Context, 
utils.TopicName) error
 
-       // GetDeduplicationSnapshotInterval returns deduplication snapshot 
interval for a topic
+       // GetDeduplicationSnapshotInterval returns deduplication snapshot 
interval for a topic. Returns -1 if not set
        GetDeduplicationSnapshotInterval(utils.TopicName) (int, error)
 
-       // GetDeduplicationSnapshotIntervalWithContext returns deduplication 
snapshot interval for a topic
+       // GetDeduplicationSnapshotIntervalWithContext returns deduplication 
snapshot interval for a topic.
+       // Returns -1 if not set
        GetDeduplicationSnapshotIntervalWithContext(context.Context, 
utils.TopicName) (int, error)
 
        // SetDeduplicationSnapshotInterval sets deduplication snapshot 
interval for a topic
@@ -1457,7 +1461,7 @@ func (t *topics) GetMessageTTL(topic utils.TopicName) 
(int, error) {
 }
 
 func (t *topics) GetMessageTTLWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var ttl int
+       var ttl = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"messageTTL")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &ttl)
        return ttl, err
@@ -1492,7 +1496,7 @@ func (t *topics) GetMaxProducers(topic utils.TopicName) 
(int, error) {
 }
 
 func (t *topics) GetMaxProducersWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var maxProducers int
+       var maxProducers = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxProducers")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxProducers)
        return maxProducers, err
@@ -1523,7 +1527,7 @@ func (t *topics) GetMaxConsumers(topic utils.TopicName) 
(int, error) {
 }
 
 func (t *topics) GetMaxConsumersWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var maxConsumers int
+       var maxConsumers = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxConsumers")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
        return maxConsumers, err
@@ -1554,7 +1558,7 @@ func (t *topics) GetMaxUnackMessagesPerConsumer(topic 
utils.TopicName) (int, err
 }
 
 func (t *topics) GetMaxUnackMessagesPerConsumerWithContext(ctx 
context.Context, topic utils.TopicName) (int, error) {
-       var maxNum int
+       var maxNum = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxUnackedMessagesOnConsumer")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
        return maxNum, err
@@ -1590,7 +1594,7 @@ func (t *topics) 
GetMaxUnackMessagesPerSubscriptionWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (int, error) {
-       var maxNum int
+       var maxNum = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxUnackedMessagesOnSubscription")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
        return maxNum, err
@@ -1833,7 +1837,7 @@ func (t *topics) GetCompactionThresholdWithContext(
        topic utils.TopicName,
        applied bool,
 ) (int64, error) {
-       var threshold int64
+       var threshold int64 = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"compactionThreshold")
        _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, 
&threshold, map[string]string{
                "applied": strconv.FormatBool(applied),
@@ -2042,7 +2046,7 @@ func (t *topics) GetMaxConsumersPerSubscription(topic 
utils.TopicName) (int, err
 }
 
 func (t *topics) GetMaxConsumersPerSubscriptionWithContext(ctx 
context.Context, topic utils.TopicName) (int, error) {
-       var maxConsumers int
+       var maxConsumers = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxConsumersPerSubscription")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
        return maxConsumers, err
@@ -2075,7 +2079,7 @@ func (t *topics) GetMaxMessageSize(topic utils.TopicName) 
(int, error) {
 }
 
 func (t *topics) GetMaxMessageSizeWithContext(ctx context.Context, topic 
utils.TopicName) (int, error) {
-       var maxMessageSize int
+       var maxMessageSize = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxMessageSize")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxMessageSize)
        return maxMessageSize, err
@@ -2104,7 +2108,7 @@ func (t *topics) GetMaxSubscriptionsPerTopic(topic 
utils.TopicName) (int, error)
 }
 
 func (t *topics) GetMaxSubscriptionsPerTopicWithContext(ctx context.Context, 
topic utils.TopicName) (int, error) {
-       var maxSubscriptions int
+       var maxSubscriptions = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"maxSubscriptionsPerTopic")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxSubscriptions)
        return maxSubscriptions, err
@@ -2170,7 +2174,7 @@ func (t *topics) GetDeduplicationSnapshotInterval(topic 
utils.TopicName) (int, e
 }
 
 func (t *topics) GetDeduplicationSnapshotIntervalWithContext(ctx 
context.Context, topic utils.TopicName) (int, error) {
-       var interval int
+       var interval = -1
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"deduplicationSnapshotInterval")
        err := t.pulsar.Client.GetWithContext(ctx, endpoint, &interval)
        return interval, err
diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index 9abf8027..8493cf76 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -477,7 +477,7 @@ func TestMessageTTL(t *testing.T) {
 
        messageTTL, err := admin.Topics().GetMessageTTL(*topicName)
        assert.NoError(t, err)
-       assert.Equal(t, 0, messageTTL)
+       assert.Equal(t, -1, messageTTL)
        err = admin.Topics().SetMessageTTL(*topicName, 600)
        assert.NoError(t, err)
        //      topic policy is an async operation,
@@ -497,7 +497,7 @@ func TestMessageTTL(t *testing.T) {
                t,
                func() bool {
                        messageTTL, err = 
admin.Topics().GetMessageTTL(*topicName)
-                       return err == nil && messageTTL == 0
+                       return err == nil && messageTTL == -1
                },
                10*time.Second,
                100*time.Millisecond,
@@ -690,7 +690,7 @@ func TestMaxConsumersPerSubscription(t *testing.T) {
        // Get default max consumers per subscription
        maxConsumers, err := 
admin.Topics().GetMaxConsumersPerSubscription(*topicName)
        assert.NoError(t, err)
-       assert.Equal(t, 0, maxConsumers)
+       assert.Equal(t, -1, maxConsumers)
 
        // Set new max consumers per subscription
        err = admin.Topics().SetMaxConsumersPerSubscription(*topicName, 10)
@@ -715,7 +715,7 @@ func TestMaxConsumersPerSubscription(t *testing.T) {
                t,
                func() bool {
                        maxConsumers, err = 
admin.Topics().GetMaxConsumersPerSubscription(*topicName)
-                       return err == nil && maxConsumers == 0
+                       return err == nil && maxConsumers == -1
                },
                10*time.Second,
                100*time.Millisecond,
@@ -737,7 +737,7 @@ func TestMaxMessageSize(t *testing.T) {
        // Get default max message size
        maxMessageSize, err := admin.Topics().GetMaxMessageSize(*topicName)
        assert.NoError(t, err)
-       assert.Equal(t, 0, maxMessageSize)
+       assert.Equal(t, -1, maxMessageSize)
 
        // Set new max message size (1MB)
        err = admin.Topics().SetMaxMessageSize(*topicName, 1048576)
@@ -762,7 +762,7 @@ func TestMaxMessageSize(t *testing.T) {
                t,
                func() bool {
                        maxMessageSize, err = 
admin.Topics().GetMaxMessageSize(*topicName)
-                       return err == nil && maxMessageSize == 0
+                       return err == nil && maxMessageSize == -1
                },
                10*time.Second,
                100*time.Millisecond,
@@ -784,7 +784,7 @@ func TestMaxSubscriptionsPerTopic(t *testing.T) {
        // Get default max subscriptions per topic
        maxSubscriptions, err := 
admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
        assert.NoError(t, err)
-       assert.Equal(t, 0, maxSubscriptions)
+       assert.Equal(t, -1, maxSubscriptions)
 
        // Set new max subscriptions per topic
        err = admin.Topics().SetMaxSubscriptionsPerTopic(*topicName, 100)
@@ -809,7 +809,7 @@ func TestMaxSubscriptionsPerTopic(t *testing.T) {
                t,
                func() bool {
                        maxSubscriptions, err = 
admin.Topics().GetMaxSubscriptionsPerTopic(*topicName)
-                       return err == nil && maxSubscriptions == 0
+                       return err == nil && maxSubscriptions == -1
                },
                10*time.Second,
                100*time.Millisecond,
@@ -899,7 +899,7 @@ func TestDeduplicationSnapshotInterval(t *testing.T) {
        // Get default deduplication snapshot interval
        interval, err := 
admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
        assert.NoError(t, err)
-       assert.Equal(t, 0, interval)
+       assert.Equal(t, -1, interval)
 
        // Set new deduplication snapshot interval
        err = admin.Topics().SetDeduplicationSnapshotInterval(*topicName, 1000)
@@ -924,7 +924,7 @@ func TestDeduplicationSnapshotInterval(t *testing.T) {
                t,
                func() bool {
                        interval, err = 
admin.Topics().GetDeduplicationSnapshotInterval(*topicName)
-                       return err == nil && interval == 0
+                       return err == nil && interval == -1
                },
                10*time.Second,
                100*time.Millisecond,
@@ -1161,3 +1161,211 @@ func TestSchemaCompatibilityStrategy(t *testing.T) {
                100*time.Millisecond,
        )
 }
+
+func TestTopics_MaxProducers(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Get default (should be -1)
+       maxProducers, err := admin.Topics().GetMaxProducers(*topicName)
+       assert.NoError(t, err)
+       assert.Equal(t, -1, maxProducers)
+
+       // Set to 0 explicitly (unlimited)
+       err = admin.Topics().SetMaxProducers(*topicName, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxProducers, err = 
admin.Topics().GetMaxProducers(*topicName)
+                       return err == nil && maxProducers == 0
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Set to positive value
+       err = admin.Topics().SetMaxProducers(*topicName, 10)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxProducers, err = 
admin.Topics().GetMaxProducers(*topicName)
+                       return err == nil && maxProducers == 10
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove
+       err = admin.Topics().RemoveMaxProducers(*topicName)
+       assert.NoError(t, err)
+}
+
+func TestTopics_MaxConsumers(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Get default (should be -1)
+       maxConsumers, err := admin.Topics().GetMaxConsumers(*topicName)
+       assert.NoError(t, err)
+       assert.Equal(t, -1, maxConsumers)
+
+       // Set to 0 explicitly
+       err = admin.Topics().SetMaxConsumers(*topicName, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxConsumers, err = 
admin.Topics().GetMaxConsumers(*topicName)
+                       return err == nil && maxConsumers == 0
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Set to positive value
+       err = admin.Topics().SetMaxConsumers(*topicName, 20)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxConsumers, err = 
admin.Topics().GetMaxConsumers(*topicName)
+                       return err == nil && maxConsumers == 20
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove
+       err = admin.Topics().RemoveMaxConsumers(*topicName)
+       assert.NoError(t, err)
+}
+
+func TestTopics_MaxUnackMessagesPerConsumer(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Get default (should be -1)
+       maxUnack, err := 
admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
+       assert.NoError(t, err)
+       assert.Equal(t, -1, maxUnack)
+
+       // Set to 0 explicitly
+       err = admin.Topics().SetMaxUnackMessagesPerConsumer(*topicName, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxUnack, err = 
admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
+                       return err == nil && maxUnack == 0
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Set to positive value
+       err = admin.Topics().SetMaxUnackMessagesPerConsumer(*topicName, 1000)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxUnack, err = 
admin.Topics().GetMaxUnackMessagesPerConsumer(*topicName)
+                       return err == nil && maxUnack == 1000
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove
+       err = admin.Topics().RemoveMaxUnackMessagesPerConsumer(*topicName)
+       assert.NoError(t, err)
+}
+
+func TestTopics_MaxUnackMessagesPerSubscription(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+       err = admin.Topics().Create(*topicName, 4)
+       assert.NoError(t, err)
+
+       // Get default (should be -1)
+       maxUnack, err := 
admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
+       assert.NoError(t, err)
+       assert.Equal(t, -1, maxUnack)
+
+       // Set to 0 explicitly
+       err = admin.Topics().SetMaxUnackMessagesPerSubscription(*topicName, 0)
+       assert.NoError(t, err)
+
+       // Verify returns 0
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxUnack, err = 
admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
+                       return err == nil && maxUnack == 0
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Set to positive value
+       err = admin.Topics().SetMaxUnackMessagesPerSubscription(*topicName, 
5000)
+       assert.NoError(t, err)
+
+       // Verify returns value
+       assert.Eventually(
+               t,
+               func() bool {
+                       maxUnack, err = 
admin.Topics().GetMaxUnackMessagesPerSubscription(*topicName)
+                       return err == nil && maxUnack == 5000
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+
+       // Remove
+       err = admin.Topics().RemoveMaxUnackMessagesPerSubscription(*topicName)
+       assert.NoError(t, err)
+}


Reply via email to