This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 49fce726 [Improve] Supplement schema admin api (#1215)
49fce726 is described below

commit 49fce72614c37fd8644c6305eb3405fcf48d4f52
Author: crossoverJie <[email protected]>
AuthorDate: Fri May 10 22:08:57 2024 +0800

    [Improve] Supplement schema admin api (#1215)
    
    ### Motivation
    To keep consistent with the Java client.
    
    ### Modifications
    - CreateSchemaBySchemaInfo
    - GetVersionBySchemaInfo
    - GetVersionByPayload
    - TestCompatibilityWithSchemaInfo
    - TestCompatibilityWithPostSchemaPayload
---
 pulsaradmin/pkg/admin/schema.go      | 59 ++++++++++++++++++++++++++++++++++++
 pulsaradmin/pkg/admin/schema_test.go | 45 +++++++++++++++++++++++++++
 pulsaradmin/pkg/utils/schema_util.go | 23 ++++++++++++++
 3 files changed, 127 insertions(+)

diff --git a/pulsaradmin/pkg/admin/schema.go b/pulsaradmin/pkg/admin/schema.go
index d97cd7ad..7190bd99 100644
--- a/pulsaradmin/pkg/admin/schema.go
+++ b/pulsaradmin/pkg/admin/schema.go
@@ -43,6 +43,22 @@ type Schema interface {
 
        // CreateSchemaByPayload creates a schema for a given <tt>topic</tt>
        CreateSchemaByPayload(topic string, schemaPayload 
utils.PostSchemaPayload) error
+
+       // CreateSchemaBySchemaInfo creates a schema for a given <tt>topic</tt>
+       CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) 
error
+
+       // GetVersionBySchemaInfo gets the version of a schema
+       GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) 
(int64, error)
+
+       // GetVersionByPayload gets the version of a schema
+       GetVersionByPayload(topic string, schemaPayload 
utils.PostSchemaPayload) (int64, error)
+
+       // TestCompatibilityWithSchemaInfo tests compatibility with a schema
+       TestCompatibilityWithSchemaInfo(topic string, schemaInfo 
utils.SchemaInfo) (*utils.IsCompatibility, error)
+
+       // TestCompatibilityWithPostSchemaPayload tests compatibility with a 
schema
+       TestCompatibilityWithPostSchemaPayload(topic string,
+               schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, 
error)
 }
 
 type schemas struct {
@@ -148,3 +164,46 @@ func (s *schemas) CreateSchemaByPayload(topic string, 
schemaPayload utils.PostSc
 
        return s.pulsar.Client.Post(endpoint, &schemaPayload)
 }
+
+func (s *schemas) CreateSchemaBySchemaInfo(topic string, schemaInfo 
utils.SchemaInfo) error {
+       schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
+       return s.CreateSchemaByPayload(topic, schemaPayload)
+}
+
+func (s *schemas) GetVersionBySchemaInfo(topic string, schemaInfo 
utils.SchemaInfo) (int64, error) {
+       schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
+       return s.GetVersionByPayload(topic, schemaPayload)
+}
+
+func (s *schemas) GetVersionByPayload(topic string, schemaPayload 
utils.PostSchemaPayload) (int64, error) {
+       topicName, err := utils.GetTopicName(topic)
+       if err != nil {
+               return 0, err
+       }
+       version := struct {
+               Version int64 `json:"version"`
+       }{}
+       endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), 
topicName.GetNamespace(),
+               topicName.GetLocalName(), "version")
+       err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &version)
+       return version.Version, err
+}
+
+func (s *schemas) TestCompatibilityWithSchemaInfo(topic string,
+       schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) {
+       schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
+       return s.TestCompatibilityWithPostSchemaPayload(topic, schemaPayload)
+}
+
+func (s *schemas) TestCompatibilityWithPostSchemaPayload(topic string,
+       schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) {
+       topicName, err := utils.GetTopicName(topic)
+       if err != nil {
+               return nil, err
+       }
+       var isCompatibility utils.IsCompatibility
+       endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), 
topicName.GetNamespace(),
+               topicName.GetLocalName(), "compatibility")
+       err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, 
&isCompatibility)
+       return &isCompatibility, err
+}
diff --git a/pulsaradmin/pkg/admin/schema_test.go 
b/pulsaradmin/pkg/admin/schema_test.go
index 17c1a54d..3560559e 100644
--- a/pulsaradmin/pkg/admin/schema_test.go
+++ b/pulsaradmin/pkg/admin/schema_test.go
@@ -77,3 +77,48 @@ func TestSchemas_ForceDeleteSchema(t *testing.T) {
        assert.Errorf(t, err, "Schema not found")
 
 }
+
+func TestSchemas_CreateSchemaBySchemaInfo(t *testing.T) {
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+
+       schemaInfo := utils.SchemaInfo{
+               Schema: []byte(""),
+               Type:   "STRING",
+       }
+       topic := fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
+       err = admin.Schemas().CreateSchemaBySchemaInfo(topic, schemaInfo)
+       assert.NoError(t, err)
+
+       info, err := admin.Schemas().GetSchemaInfo(topic)
+       assert.NoError(t, err)
+       assert.Equal(t, schemaInfo.Type, info.Type)
+
+       version, err := admin.Schemas().GetVersionBySchemaInfo(topic, 
schemaInfo)
+       assert.NoError(t, err)
+       assert.Equal(t, version, int64(0))
+
+       schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
+       version, err = admin.Schemas().GetVersionByPayload(topic, schemaPayload)
+       assert.NoError(t, err)
+       assert.Equal(t, version, int64(0))
+
+       compatibility, err := 
admin.Schemas().TestCompatibilityWithSchemaInfo(topic, schemaInfo)
+       assert.NoError(t, err)
+       assert.Equal(t, compatibility.IsCompatibility, true)
+       assert.Equal(t, compatibility.SchemaCompatibilityStrategy, 
utils.SchemaCompatibilityStrategy("FULL"))
+
+       compatibility, err = 
admin.Schemas().TestCompatibilityWithPostSchemaPayload(topic, schemaPayload)
+       assert.NoError(t, err)
+       assert.Equal(t, compatibility.IsCompatibility, true)
+       assert.Equal(t, compatibility.SchemaCompatibilityStrategy, 
utils.SchemaCompatibilityStrategy("FULL"))
+
+       err = admin.Schemas().ForceDeleteSchema(topic)
+       assert.NoError(t, err)
+
+       _, err = admin.Schemas().GetSchemaInfo(topic)
+       assert.Errorf(t, err, "Schema not found")
+
+}
diff --git a/pulsaradmin/pkg/utils/schema_util.go 
b/pulsaradmin/pkg/utils/schema_util.go
index 08aaf54a..3b836690 100644
--- a/pulsaradmin/pkg/utils/schema_util.go
+++ b/pulsaradmin/pkg/utils/schema_util.go
@@ -44,6 +44,11 @@ type GetSchemaResponse struct {
        Properties map[string]string `json:"properties"`
 }
 
+type IsCompatibility struct {
+       IsCompatibility             bool                        
`json:"compatibility"`
+       SchemaCompatibilityStrategy SchemaCompatibilityStrategy 
`json:"schemaCompatibilityStrategy"`
+}
+
 func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response 
GetSchemaResponse) *SchemaInfo {
        info := new(SchemaInfo)
        schema := make([]byte, 0, 10)
@@ -61,6 +66,24 @@ func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, 
response GetSchemaRespo
        return info
 }
 
+func ConvertSchemaDataToStringLegacy(schemaInfo SchemaInfo) string {
+       schema := schemaInfo.Schema
+       if schema == nil {
+               return ""
+       }
+       // TODO: KEY_VALUE
+       return string(schema)
+
+}
+
+func ConvertSchemaInfoToPostSchemaPayload(schemaInfo SchemaInfo) 
PostSchemaPayload {
+       return PostSchemaPayload{
+               SchemaType: schemaInfo.Type,
+               Schema:     ConvertSchemaDataToStringLegacy(schemaInfo),
+               Properties: schemaInfo.Properties,
+       }
+}
+
 func ConvertGetSchemaResponseToSchemaInfoWithVersion(tn *TopicName, response 
GetSchemaResponse) *SchemaInfoWithVersion {
        info := new(SchemaInfoWithVersion)
        info.SchemaInfo = ConvertGetSchemaResponseToSchemaInfo(tn, response)

Reply via email to