This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7832a7da feat(topic): add support for retrieving applied schema 
compatibility strategy (#1469)
7832a7da is described below

commit 7832a7da85158d86a2bf56f9a6eaf7eebe5bde50
Author: Rui Fu <[email protected]>
AuthorDate: Wed Mar 11 08:59:04 2026 +0800

    feat(topic): add support for retrieving applied schema compatibility 
strategy (#1469)
---
 pulsaradmin/pkg/admin/topic.go      |  65 +++++++++++++--
 pulsaradmin/pkg/admin/topic_test.go | 155 +++++++++++++++++++++++++++++++-----
 2 files changed, 194 insertions(+), 26 deletions(-)

diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index de195138..65f8c747 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -21,6 +21,7 @@ import (
        "context"
        "fmt"
        "strconv"
+       "strings"
 
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
 
@@ -1027,12 +1028,36 @@ type Topics interface {
        // RemoveAutoSubscriptionCreationWithContext Remove auto subscription 
creation override for a topic
        RemoveAutoSubscriptionCreationWithContext(context.Context, 
utils.TopicName) error
 
-       // GetSchemaCompatibilityStrategy returns schema compatibility strategy 
for a topic
+       // GetSchemaCompatibilityStrategy returns the applied schema 
compatibility strategy for a topic.
        GetSchemaCompatibilityStrategy(utils.TopicName) 
(utils.SchemaCompatibilityStrategy, error)
 
-       // GetSchemaCompatibilityStrategyWithContext returns schema 
compatibility strategy for a topic
+       // GetSchemaCompatibilityStrategyWithContext returns the applied schema 
compatibility strategy for a topic.
        GetSchemaCompatibilityStrategyWithContext(context.Context, 
utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
 
+       // GetSchemaCompatibilityStrategyApplied returns schema compatibility 
strategy for a topic.
+       //
+       // @param topic
+       //        topicName struct
+       // @param applied
+       //        when set to true, function will try to find policy applied to 
this topic
+       //        in namespace or broker level, if no policy set in topic level
+       GetSchemaCompatibilityStrategyApplied(utils.TopicName, bool) 
(utils.SchemaCompatibilityStrategy, error)
+
+       // GetSchemaCompatibilityStrategyAppliedWithContext returns schema 
compatibility strategy for a topic.
+       //
+       // @param ctx
+       //        context used for the request
+       // @param topic
+       //        topicName struct
+       // @param applied
+       //        when set to true, function will try to find policy applied to 
this topic
+       //        in namespace or broker level, if no policy set in topic level
+       GetSchemaCompatibilityStrategyAppliedWithContext(
+               context.Context,
+               utils.TopicName,
+               bool,
+       ) (utils.SchemaCompatibilityStrategy, error)
+
        // SetSchemaCompatibilityStrategy sets schema compatibility strategy 
for a topic
        SetSchemaCompatibilityStrategy(utils.TopicName,
                utils.SchemaCompatibilityStrategy) error
@@ -2320,17 +2345,36 @@ func (t *topics) 
RemoveAutoSubscriptionCreationWithContext(ctx context.Context,
 }
 
 func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName) 
(utils.SchemaCompatibilityStrategy, error) {
-       return 
t.GetSchemaCompatibilityStrategyWithContext(context.Background(), topic)
+       return t.GetSchemaCompatibilityStrategyApplied(topic, true)
 }
 
 func (t *topics) GetSchemaCompatibilityStrategyWithContext(
        ctx context.Context,
        topic utils.TopicName,
 ) (utils.SchemaCompatibilityStrategy, error) {
-       var strategy utils.SchemaCompatibilityStrategy
+       return t.GetSchemaCompatibilityStrategyAppliedWithContext(ctx, topic, 
true)
+}
+
+func (t *topics) GetSchemaCompatibilityStrategyApplied(
+       topic utils.TopicName,
+       applied bool,
+) (utils.SchemaCompatibilityStrategy, error) {
+       return 
t.GetSchemaCompatibilityStrategyAppliedWithContext(context.Background(), topic, 
applied)
+}
+
+func (t *topics) GetSchemaCompatibilityStrategyAppliedWithContext(
+       ctx context.Context,
+       topic utils.TopicName,
+       applied bool,
+) (utils.SchemaCompatibilityStrategy, error) {
        endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), 
"schemaCompatibilityStrategy")
-       err := t.pulsar.Client.GetWithContext(ctx, endpoint, &strategy)
-       return strategy, err
+       body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, 
endpoint, nil, map[string]string{
+               "applied": strconv.FormatBool(applied),
+       }, false)
+       if err != nil {
+               return "", err
+       }
+       return parseTopicSchemaCompatibilityStrategy(body)
 }
 
 func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
@@ -2353,6 +2397,15 @@ func (t *topics) 
RemoveSchemaCompatibilityStrategyWithContext(ctx context.Contex
        return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
 }
 
+func parseTopicSchemaCompatibilityStrategy(body []byte) 
(utils.SchemaCompatibilityStrategy, error) {
+       raw := strings.ReplaceAll(string(body), "\"", "")
+       if raw == "" {
+               return utils.SchemaCompatibilityStrategyUndefined, nil
+       }
+
+       return utils.ParseSchemaCompatibilityStrategy(raw)
+}
+
 func (t *topics) GetOffloadPolicies(topic utils.TopicName) 
(*utils.OffloadPolicies, error) {
        return t.GetOffloadPoliciesWithContext(context.Background(), topic)
 }
diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index 5655930d..ae850105 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -326,6 +326,65 @@ func newTopicName() string {
        return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
 }
 
+func assertTopicSchemaCompatibilityStrategyEventually(
+       t *testing.T,
+       getter func() (utils.SchemaCompatibilityStrategy, error),
+       expected utils.SchemaCompatibilityStrategy,
+) {
+       assert.Eventually(
+               t,
+               func() bool {
+                       strategy, err := getter()
+                       return err == nil && strategy == expected
+               },
+               10*time.Second,
+               100*time.Millisecond,
+       )
+}
+
+func TestParseTopicSchemaCompatibilityStrategy(t *testing.T) {
+       testCases := []struct {
+               name     string
+               body     []byte
+               expected utils.SchemaCompatibilityStrategy
+               wantErr  bool
+       }{
+               {
+                       name:     "empty body maps to undefined",
+                       body:     nil,
+                       expected: utils.SchemaCompatibilityStrategyUndefined,
+               },
+               {
+                       name:     "empty string maps to undefined",
+                       body:     []byte(`""`),
+                       expected: utils.SchemaCompatibilityStrategyUndefined,
+               },
+               {
+                       name:     "valid strategy is parsed",
+                       body:     []byte(`"FULL"`),
+                       expected: utils.SchemaCompatibilityStrategyFull,
+               },
+               {
+                       name:    "invalid strategy returns error",
+                       body:    []byte(`"NOT_A_STRATEGY"`),
+                       wantErr: true,
+               },
+       }
+
+       for _, tt := range testCases {
+               t.Run(tt.name, func(t *testing.T) {
+                       strategy, err := 
parseTopicSchemaCompatibilityStrategy(tt.body)
+                       if tt.wantErr {
+                               assert.Error(t, err)
+                               return
+                       }
+
+                       assert.NoError(t, err)
+                       assert.Equal(t, tt.expected, strategy)
+               })
+       }
+}
+
 func TestDeleteNonPartitionedTopic(t *testing.T) {
        randomName := newTopicName()
        topic := "persistent://public/default/" + randomName
@@ -1089,50 +1148,106 @@ func TestAutoSubscriptionCreation(t *testing.T) {
 
 func TestSchemaCompatibilityStrategy(t *testing.T) {
        randomName := newTopicName()
-       topic := "persistent://public/default/" + randomName
+       namespace := "public/" + randomName + "-ns"
+       topic := "persistent://" + namespace + "/" + randomName
        cfg := &config.Config{}
        admin, err := New(cfg)
        assert.NoError(t, err)
        assert.NotNil(t, admin)
+
+       err = admin.Namespaces().CreateNamespace(namespace)
+       assert.NoError(t, err)
+
        topicName, err := utils.GetTopicName(topic)
        assert.NoError(t, err)
-       err = admin.Topics().Create(*topicName, 4)
+       namespaceName, err := utils.GetNamespaceName(namespace)
        assert.NoError(t, err)
 
-       // Get default schema compatibility strategy (adapt to actual server 
behavior)
-       initialStrategy, err := 
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+       t.Cleanup(func() {
+               _ = admin.Topics().Delete(*topicName, true, false)
+               _ = admin.Namespaces().DeleteNamespace(namespace)
+       })
+
+       err = admin.Topics().Create(*topicName, 4)
        assert.NoError(t, err)
-       // Server may return empty string instead of "UNDEFINED"
 
-       // Set new schema compatibility strategy
-       err = admin.Topics().SetSchemaCompatibilityStrategy(*topicName, 
utils.SchemaCompatibilityStrategyBackward)
+       err = admin.Namespaces().SetSchemaCompatibilityStrategy(*namespaceName, 
utils.SchemaCompatibilityStrategyFull)
        assert.NoError(t, err)
 
-       // topic policy is an async operation,
-       // so we need to wait for a while to get current value
        assert.Eventually(
                t,
                func() bool {
-                       strategy, err := 
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
-                       return err == nil &&
-                               strategy == 
utils.SchemaCompatibilityStrategyBackward
+                       strategy, err := 
admin.Namespaces().GetSchemaCompatibilityStrategy(*namespaceName)
+                       return err == nil && strategy == 
utils.SchemaCompatibilityStrategyFull
                },
                10*time.Second,
                100*time.Millisecond,
        )
 
-       // Remove schema compatibility strategy policy
+       assertTopicSchemaCompatibilityStrategyEventually(
+               t,
+               func() (utils.SchemaCompatibilityStrategy, error) {
+                       return 
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+               },
+               utils.SchemaCompatibilityStrategyFull,
+       )
+
+       assertTopicSchemaCompatibilityStrategyEventually(
+               t,
+               func() (utils.SchemaCompatibilityStrategy, error) {
+                       return 
admin.Topics().GetSchemaCompatibilityStrategyApplied(*topicName, true)
+               },
+               utils.SchemaCompatibilityStrategyFull,
+       )
+
+       assertTopicSchemaCompatibilityStrategyEventually(
+               t,
+               func() (utils.SchemaCompatibilityStrategy, error) {
+                       return 
admin.Topics().GetSchemaCompatibilityStrategyAppliedWithContext(
+                               context.Background(),
+                               *topicName,
+                               false,
+                       )
+               },
+               utils.SchemaCompatibilityStrategyUndefined,
+       )
+
+       err = admin.Topics().SetSchemaCompatibilityStrategy(*topicName, 
utils.SchemaCompatibilityStrategyBackward)
+       assert.NoError(t, err)
+
+       assertTopicSchemaCompatibilityStrategyEventually(
+               t,
+               func() (utils.SchemaCompatibilityStrategy, error) {
+                       return 
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+               },
+               utils.SchemaCompatibilityStrategyBackward,
+       )
+
+       assertTopicSchemaCompatibilityStrategyEventually(
+               t,
+               func() (utils.SchemaCompatibilityStrategy, error) {
+                       return 
admin.Topics().GetSchemaCompatibilityStrategyApplied(*topicName, false)
+               },
+               utils.SchemaCompatibilityStrategyBackward,
+       )
+
        err = admin.Topics().RemoveSchemaCompatibilityStrategy(*topicName)
        assert.NoError(t, err)
-       assert.Eventually(
+
+       assertTopicSchemaCompatibilityStrategyEventually(
                t,
-               func() bool {
-                       strategy, err := 
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
-                       return err == nil &&
-                               strategy == initialStrategy
+               func() (utils.SchemaCompatibilityStrategy, error) {
+                       return 
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
                },
-               10*time.Second,
-               100*time.Millisecond,
+               utils.SchemaCompatibilityStrategyFull,
+       )
+
+       assertTopicSchemaCompatibilityStrategyEventually(
+               t,
+               func() (utils.SchemaCompatibilityStrategy, error) {
+                       return 
admin.Topics().GetSchemaCompatibilityStrategyApplied(*topicName, false)
+               },
+               utils.SchemaCompatibilityStrategyUndefined,
        )
 }
 

Reply via email to