This is an automated email from the ASF dual-hosted git repository.
rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7832a7da feat(topic): add support for retrieving applied schema
compatibility strategy (#1469)
7832a7da is described below
commit 7832a7da85158d86a2bf56f9a6eaf7eebe5bde50
Author: Rui Fu <[email protected]>
AuthorDate: Wed Mar 11 08:59:04 2026 +0800
feat(topic): add support for retrieving applied schema compatibility
strategy (#1469)
---
pulsaradmin/pkg/admin/topic.go | 65 +++++++++++++--
pulsaradmin/pkg/admin/topic_test.go | 155 +++++++++++++++++++++++++++++++-----
2 files changed, 194 insertions(+), 26 deletions(-)
diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go
index de195138..65f8c747 100644
--- a/pulsaradmin/pkg/admin/topic.go
+++ b/pulsaradmin/pkg/admin/topic.go
@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"strconv"
+ "strings"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
@@ -1027,12 +1028,36 @@ type Topics interface {
// RemoveAutoSubscriptionCreationWithContext Remove auto subscription
creation override for a topic
RemoveAutoSubscriptionCreationWithContext(context.Context,
utils.TopicName) error
- // GetSchemaCompatibilityStrategy returns schema compatibility strategy
for a topic
+ // GetSchemaCompatibilityStrategy returns the applied schema
compatibility strategy for a topic.
GetSchemaCompatibilityStrategy(utils.TopicName)
(utils.SchemaCompatibilityStrategy, error)
- // GetSchemaCompatibilityStrategyWithContext returns schema
compatibility strategy for a topic
+ // GetSchemaCompatibilityStrategyWithContext returns the applied schema
compatibility strategy for a topic.
GetSchemaCompatibilityStrategyWithContext(context.Context,
utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
+ // GetSchemaCompatibilityStrategyApplied returns schema compatibility
strategy for a topic.
+ //
+ // @param topic
+ // topicName struct
+ // @param applied
+ // when set to true, function will try to find policy applied to
this topic
+ // in namespace or broker level, if no policy set in topic level
+ GetSchemaCompatibilityStrategyApplied(utils.TopicName, bool)
(utils.SchemaCompatibilityStrategy, error)
+
+ // GetSchemaCompatibilityStrategyAppliedWithContext returns schema
compatibility strategy for a topic.
+ //
+ // @param ctx
+ // context used for the request
+ // @param topic
+ // topicName struct
+ // @param applied
+ // when set to true, function will try to find policy applied to
this topic
+ // in namespace or broker level, if no policy set in topic level
+ GetSchemaCompatibilityStrategyAppliedWithContext(
+ context.Context,
+ utils.TopicName,
+ bool,
+ ) (utils.SchemaCompatibilityStrategy, error)
+
// SetSchemaCompatibilityStrategy sets schema compatibility strategy
for a topic
SetSchemaCompatibilityStrategy(utils.TopicName,
utils.SchemaCompatibilityStrategy) error
@@ -2320,17 +2345,36 @@ func (t *topics)
RemoveAutoSubscriptionCreationWithContext(ctx context.Context,
}
func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName)
(utils.SchemaCompatibilityStrategy, error) {
- return
t.GetSchemaCompatibilityStrategyWithContext(context.Background(), topic)
+ return t.GetSchemaCompatibilityStrategyApplied(topic, true)
}
func (t *topics) GetSchemaCompatibilityStrategyWithContext(
ctx context.Context,
topic utils.TopicName,
) (utils.SchemaCompatibilityStrategy, error) {
- var strategy utils.SchemaCompatibilityStrategy
+ return t.GetSchemaCompatibilityStrategyAppliedWithContext(ctx, topic,
true)
+}
+
+func (t *topics) GetSchemaCompatibilityStrategyApplied(
+ topic utils.TopicName,
+ applied bool,
+) (utils.SchemaCompatibilityStrategy, error) {
+ return
t.GetSchemaCompatibilityStrategyAppliedWithContext(context.Background(), topic,
applied)
+}
+
+func (t *topics) GetSchemaCompatibilityStrategyAppliedWithContext(
+ ctx context.Context,
+ topic utils.TopicName,
+ applied bool,
+) (utils.SchemaCompatibilityStrategy, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(),
"schemaCompatibilityStrategy")
- err := t.pulsar.Client.GetWithContext(ctx, endpoint, &strategy)
- return strategy, err
+ body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx,
endpoint, nil, map[string]string{
+ "applied": strconv.FormatBool(applied),
+ }, false)
+ if err != nil {
+ return "", err
+ }
+ return parseTopicSchemaCompatibilityStrategy(body)
}
func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
@@ -2353,6 +2397,15 @@ func (t *topics)
RemoveSchemaCompatibilityStrategyWithContext(ctx context.Contex
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
+func parseTopicSchemaCompatibilityStrategy(body []byte)
(utils.SchemaCompatibilityStrategy, error) {
+ raw := strings.ReplaceAll(string(body), "\"", "")
+ if raw == "" {
+ return utils.SchemaCompatibilityStrategyUndefined, nil
+ }
+
+ return utils.ParseSchemaCompatibilityStrategy(raw)
+}
+
func (t *topics) GetOffloadPolicies(topic utils.TopicName)
(*utils.OffloadPolicies, error) {
return t.GetOffloadPoliciesWithContext(context.Background(), topic)
}
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index 5655930d..ae850105 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -326,6 +326,65 @@ func newTopicName() string {
return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
}
+func assertTopicSchemaCompatibilityStrategyEventually(
+ t *testing.T,
+ getter func() (utils.SchemaCompatibilityStrategy, error),
+ expected utils.SchemaCompatibilityStrategy,
+) {
+ assert.Eventually(
+ t,
+ func() bool {
+ strategy, err := getter()
+ return err == nil && strategy == expected
+ },
+ 10*time.Second,
+ 100*time.Millisecond,
+ )
+}
+
+func TestParseTopicSchemaCompatibilityStrategy(t *testing.T) {
+ testCases := []struct {
+ name string
+ body []byte
+ expected utils.SchemaCompatibilityStrategy
+ wantErr bool
+ }{
+ {
+ name: "empty body maps to undefined",
+ body: nil,
+ expected: utils.SchemaCompatibilityStrategyUndefined,
+ },
+ {
+ name: "empty string maps to undefined",
+ body: []byte(`""`),
+ expected: utils.SchemaCompatibilityStrategyUndefined,
+ },
+ {
+ name: "valid strategy is parsed",
+ body: []byte(`"FULL"`),
+ expected: utils.SchemaCompatibilityStrategyFull,
+ },
+ {
+ name: "invalid strategy returns error",
+ body: []byte(`"NOT_A_STRATEGY"`),
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range testCases {
+ t.Run(tt.name, func(t *testing.T) {
+ strategy, err :=
parseTopicSchemaCompatibilityStrategy(tt.body)
+ if tt.wantErr {
+ assert.Error(t, err)
+ return
+ }
+
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expected, strategy)
+ })
+ }
+}
+
func TestDeleteNonPartitionedTopic(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
@@ -1089,50 +1148,106 @@ func TestAutoSubscriptionCreation(t *testing.T) {
func TestSchemaCompatibilityStrategy(t *testing.T) {
randomName := newTopicName()
- topic := "persistent://public/default/" + randomName
+ namespace := "public/" + randomName + "-ns"
+ topic := "persistent://" + namespace + "/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
+
+ err = admin.Namespaces().CreateNamespace(namespace)
+ assert.NoError(t, err)
+
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
- err = admin.Topics().Create(*topicName, 4)
+ namespaceName, err := utils.GetNamespaceName(namespace)
assert.NoError(t, err)
- // Get default schema compatibility strategy (adapt to actual server
behavior)
- initialStrategy, err :=
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+ t.Cleanup(func() {
+ _ = admin.Topics().Delete(*topicName, true, false)
+ _ = admin.Namespaces().DeleteNamespace(namespace)
+ })
+
+ err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)
- // Server may return empty string instead of "UNDEFINED"
- // Set new schema compatibility strategy
- err = admin.Topics().SetSchemaCompatibilityStrategy(*topicName,
utils.SchemaCompatibilityStrategyBackward)
+ err = admin.Namespaces().SetSchemaCompatibilityStrategy(*namespaceName,
utils.SchemaCompatibilityStrategyFull)
assert.NoError(t, err)
- // topic policy is an async operation,
- // so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
- strategy, err :=
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
- return err == nil &&
- strategy ==
utils.SchemaCompatibilityStrategyBackward
+ strategy, err :=
admin.Namespaces().GetSchemaCompatibilityStrategy(*namespaceName)
+ return err == nil && strategy ==
utils.SchemaCompatibilityStrategyFull
},
10*time.Second,
100*time.Millisecond,
)
- // Remove schema compatibility strategy policy
+ assertTopicSchemaCompatibilityStrategyEventually(
+ t,
+ func() (utils.SchemaCompatibilityStrategy, error) {
+ return
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+ },
+ utils.SchemaCompatibilityStrategyFull,
+ )
+
+ assertTopicSchemaCompatibilityStrategyEventually(
+ t,
+ func() (utils.SchemaCompatibilityStrategy, error) {
+ return
admin.Topics().GetSchemaCompatibilityStrategyApplied(*topicName, true)
+ },
+ utils.SchemaCompatibilityStrategyFull,
+ )
+
+ assertTopicSchemaCompatibilityStrategyEventually(
+ t,
+ func() (utils.SchemaCompatibilityStrategy, error) {
+ return
admin.Topics().GetSchemaCompatibilityStrategyAppliedWithContext(
+ context.Background(),
+ *topicName,
+ false,
+ )
+ },
+ utils.SchemaCompatibilityStrategyUndefined,
+ )
+
+ err = admin.Topics().SetSchemaCompatibilityStrategy(*topicName,
utils.SchemaCompatibilityStrategyBackward)
+ assert.NoError(t, err)
+
+ assertTopicSchemaCompatibilityStrategyEventually(
+ t,
+ func() (utils.SchemaCompatibilityStrategy, error) {
+ return
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
+ },
+ utils.SchemaCompatibilityStrategyBackward,
+ )
+
+ assertTopicSchemaCompatibilityStrategyEventually(
+ t,
+ func() (utils.SchemaCompatibilityStrategy, error) {
+ return
admin.Topics().GetSchemaCompatibilityStrategyApplied(*topicName, false)
+ },
+ utils.SchemaCompatibilityStrategyBackward,
+ )
+
err = admin.Topics().RemoveSchemaCompatibilityStrategy(*topicName)
assert.NoError(t, err)
- assert.Eventually(
+
+ assertTopicSchemaCompatibilityStrategyEventually(
t,
- func() bool {
- strategy, err :=
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
- return err == nil &&
- strategy == initialStrategy
+ func() (utils.SchemaCompatibilityStrategy, error) {
+ return
admin.Topics().GetSchemaCompatibilityStrategy(*topicName)
},
- 10*time.Second,
- 100*time.Millisecond,
+ utils.SchemaCompatibilityStrategyFull,
+ )
+
+ assertTopicSchemaCompatibilityStrategyEventually(
+ t,
+ func() (utils.SchemaCompatibilityStrategy, error) {
+ return
admin.Topics().GetSchemaCompatibilityStrategyApplied(*topicName, false)
+ },
+ utils.SchemaCompatibilityStrategyUndefined,
)
}