This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 12b966df [improve] Support http lookup getSchema interface (#1368) 12b966df is described below commit 12b966df593ad6455ff9fb7023dd4b831c492f53 Author: zhou zhuohan <843520...@qq.com> AuthorDate: Wed Jul 2 18:16:59 2025 +0800 [improve] Support http lookup getSchema interface (#1368) Master Issue: https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema#changespart-1 Related pr https://github.com/apache/pulsar-client-go/pull/611 ### Motivation Currently pulsar go sdk has supported multi-version schema in above pr, but the pr does not support `getSchema()` method with http lookup service. So that we will encounter error when we call `msg.GetSchemaValue(v interface{}) error` function with http serviceUrl. Demo below: ``` func createClient() Client { // create client //lookupURL := "pulsar://localhost:6650" lookupURL := "http://localhost:8080" // change to http protocol serviceUrl client, err := NewClient(ClientOptions{ URL: lookupURL, }) if err != nil { log.Fatal(err) } return client } func TestBytesSchema(t *testing.T) { client := createClient() defer client.Close() topic := newTopicName() properties := make(map[string]string) properties["pulsar"] = "hello" producerSchemaBytes := NewBytesSchema(properties) producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, Schema: producerSchemaBytes, }) assert.NoError(t, err) _, err = producer.Send(context.Background(), &ProducerMessage{ Value: []byte(`{"key": "value"}`), }) require.NoError(t, err) producer.Close() // Create consumer consumerSchemaBytes := NewBytesSchema(nil) assert.NotNil(t, consumerSchemaBytes) consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "sub-1", Schema: consumerSchemaBytes, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) assert.Nil(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() // Receive first message var out1 []byte msg1, err := consumer.Receive(ctx) assert.NoError(t, err) err = msg1.GetSchemaValue(&out1) assert.NoError(t, err) assert.Equal(t, []byte(`{"key": "value"}`), out1) consumer.Ack(msg1) require.NoError(t, err) } ``` Test output: ``` /root/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.23.0.linux-amd64/bin/go tool test2json -t /root/.cache/JetBrains/GoLand2024.1/tmp/GoLand/___TestBytesSchema_in_github_com_apache_pulsar_client_go_pulsar.test -test.v -test.paniconexit0 -test.run ^\QTestBytesSchema\E$ === RUN TestBytesSchema time="2025-05-14T16:32:45+08:00" level=info msg="Connecting to broker" remote_addr="pulsar://localhost:6650" time="2025-05-14T16:32:45+08:00" level=info msg="TCP connection established" local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650" time="2025-05-14T16:32:45+08:00" level=info msg="Connection is ready" local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650" time="2025-05-14T16:32:45+08:00" level=info msg="Connected producer" cnx="127.0.0.1:36638 -> 127.0.0.1:6650" epoch=0 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Created producer" cnx="127.0.0.1:36638 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Closing producer" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Closed producer" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Connected consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Created consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" schema_test.go:101: Error Trace: /data/code/dev/pulsar-client-go/pulsar/schema_test.go:101 Error: Received unexpected error: GetSchema is not supported by httpLookupService Test: TestBytesSchema schema_test.go:102: Error Trace: /data/code/dev/pulsar-client-go/pulsar/schema_test.go:102 Error: Not equal: expected: []byte{0x7b, 0x22, 0x6b, 0x65, 0x79, 0x22, 0x3a, 0x20, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d} actual : []byte(nil) Diff: --- Expected +++ Actual @@ -1,4 +1,2 @@ -([]uint8) (len=16) { - 00000000 7b 22 6b 65 79 22 3a 20 22 76 61 6c 75 65 22 7d |{"key": "value"}| -} +([]uint8) <nil> Test: TestBytesSchema schema_test.go:104: Error Trace: /data/code/dev/pulsar-client-go/pulsar/schema_test.go:104 Error: Received unexpected error: GetSchema is not supported by httpLookupService Test: TestBytesSchema time="2025-05-14T16:32:45+08:00" level=info msg="Closing consumer=1" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Closed consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" --- FAIL: TestBytesSchema (0.12s) Expected :[]byte{0x7b, 0x22, 0x6b, 0x65, 0x79, 0x22, 0x3a, 0x20, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d} Actual :[]byte(nil) ``` ### Modifications - Update LookupService interface return type from `GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)` to `GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)` to support http lookup protocol in `lookup_service.go` - Support HTTPLookupService `GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)` function in `lookup_service.go` - Add http lookup `GetSchema()` related test cases in schema_test.go --- pulsar/consumer_partition.go | 20 ++++---- pulsar/internal/lookup_service.go | 76 +++++++++++++++++++++++++--- pulsar/internal/schema.go | 67 ++++++++++++++++++++++++ pulsar/schema.go | 42 ++++++++------- pulsar/schema_test.go | 104 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 270 insertions(+), 39 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fc2cb774..6a8cf83d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -318,22 +318,22 @@ func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) { return schema, nil } - pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion) + // cache missed, try to use lookupService to find schema info + lookupSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion) if err != nil { return nil, err } - - if pbSchema == nil { - err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", s.topic, schemaVersion) - return nil, err - } - - var properties = internal.ConvertToStringMap(pbSchema.Properties) - - schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties) + schema, err = NewSchema( + // lookupSchema.SchemaType is internal package SchemaType type, + // we need to cast it to pulsar.SchemaType as soon as we use it in current pulsar package + SchemaType(lookupSchema.SchemaType), + lookupSchema.Data, + lookupSchema.Properties, + ) if err != nil { return nil, err } + s.add(key, schema) return schema, nil } diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index fdbf9f52..19e51b41 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -18,9 +18,11 @@ package internal import ( + "encoding/binary" "errors" "fmt" "net/url" + "strings" "google.golang.org/protobuf/proto" @@ -34,6 +36,13 @@ type LookupResult struct { PhysicalAddr *url.URL } +// LookupSchema return lookup schema result +type LookupSchema struct { + SchemaType SchemaType + Data []byte + Properties map[string]string +} + // GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode type GetTopicsOfNamespaceMode string @@ -62,7 +71,7 @@ type LookupService interface { GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error) // GetSchema returns schema for a given version. - GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) + GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error) @@ -97,7 +106,7 @@ func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResol } } -func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) { +func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) { id := ls.rpcClient.NewRequestID() req := &pb.CommandGetSchema{ RequestId: proto.Uint64(id), @@ -106,12 +115,23 @@ func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema * } res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_SCHEMA, req) if err != nil { - return nil, err + return &LookupSchema{}, err } if res.Response.Error != nil { - return nil, errors.New(res.Response.GetError().String()) + return &LookupSchema{}, errors.New(res.Response.GetError().String()) + } + + // deserialize pbSchema and convert it to LookupSchema struct + pbSchema := res.Response.GetSchemaResponse.Schema + if pbSchema == nil { + err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion) + return &LookupSchema{}, err } - return res.Response.GetSchemaResponse.Schema, nil + return &LookupSchema{ + SchemaType: SchemaType(int(*pbSchema.Type)), + Data: pbSchema.SchemaData, + Properties: ConvertToStringMap(pbSchema.Properties), + }, nil } func (ls *lookupService) GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error) { @@ -273,6 +293,8 @@ const HTTPAdminServiceV1Format string = "/admin/%s/partitions" const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions" const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s" const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s" +const HTTPSchemaV2 string = "/admin/v2/schemas/%s/schema" +const HTTPSchemaWithVersionV2 string = "/admin/v2/schemas/%s/schema/%d" type httpLookupData struct { BrokerURL string `json:"brokerUrl"` @@ -289,6 +311,12 @@ type httpLookupService struct { metrics *Metrics } +type httpLookupSchema struct { + HTTPSchemaType string `json:"type"` + Data string `json:"data"` + Properties map[string]string `json:"properties"` +} + func (h *httpLookupService) GetBrokerAddress(brokerServiceURL string, _ bool) (*LookupResult, error) { logicalAddress, err := url.ParseRequestURI(brokerServiceURL) if err != nil { @@ -371,8 +399,42 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic return topics, nil } -func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema, err error) { - return nil, errors.New("GetSchema is not supported by httpLookupService") +func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) { + topicName, err := ParseTopicName(topic) + if err != nil { + return nil, err + } + topicRestPath := fmt.Sprintf("%s/%s", topicName.Namespace, topicName.Topic) + var path string + if schemaVersion != nil { + path = fmt.Sprintf(HTTPSchemaWithVersionV2, topicRestPath, int64(binary.BigEndian.Uint64(schemaVersion))) + } else { + path = fmt.Sprintf(HTTPSchemaV2, topicRestPath) + } + lookupSchema := &httpLookupSchema{} + if err := h.httpClient.Get(path, &lookupSchema, nil); err != nil { + if strings.HasPrefix(err.Error(), "Code: 404") { + err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion) + } + h.log.Errorf("schema [ %v ] request error, schema version : [ %v ]", topic, schemaVersion) + return &LookupSchema{}, err + } + + // deserialize httpSchema and convert it to LookupSchema struct + schemaType, exists := HTTPSchemaTypeMap[strings.ToUpper(lookupSchema.HTTPSchemaType)] + if !exists { + err = fmt.Errorf("unsupported schema type [%s] for topic: [ %v ], schema version : [ %v ]", + lookupSchema.HTTPSchemaType, + topic, + schemaVersion, + ) + return nil, err + } + return &LookupSchema{ + SchemaType: schemaType, + Data: []byte(lookupSchema.Data), + Properties: lookupSchema.Properties, + }, nil } func (h *httpLookupService) ServiceNameResolver() *ServiceNameResolver { diff --git a/pulsar/internal/schema.go b/pulsar/internal/schema.go new file mode 100644 index 00000000..4b9f84cb --- /dev/null +++ b/pulsar/internal/schema.go @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +// SchemaType We need to define a SchemaType in this internal package, to avoid directly importing pulsar.SchemaType. +// In case we might encounter importing cycle problem. +type SchemaType int + +const ( + NONE SchemaType = iota //No schema defined + STRING //Simple String encoding with UTF-8 + JSON //JSON object encoding and validation + PROTOBUF //Protobuf message encoding and decoding + AVRO //Serialize and deserialize via Avro + BOOLEAN // + INT8 //A 8-byte integer. + INT16 //A 16-byte integer. + INT32 //A 32-byte integer. + INT64 //A 64-byte integer. + FLOAT //A float number. + DOUBLE //A double number + _ // + _ // + _ // + KeyValue //A Schema that contains Key Schema and Value Schema. + BYTES = 0 //A bytes array. + AUTO = -2 // + AutoConsume = -3 //Auto Consume Type. + AutoPublish = -4 //Auto Publish Type. + ProtoNative = 20 //Protobuf native message encoding and decoding +) + +var HTTPSchemaTypeMap = map[string]SchemaType{ + "NONE": BYTES, + "STRING": STRING, + "JSON": JSON, + "PROTOBUF": PROTOBUF, + "AVRO": AVRO, + "BOOLEAN": BOOLEAN, + "INT8": INT8, + "INT16": INT16, + "INT32": INT32, + "INT64": INT64, + "FLOAT": FLOAT, + "DOUBLE": DOUBLE, + "KEYVALUE": KeyValue, + "BYTES": BYTES, + "AUTO": AUTO, + "AUTOCONSUME": AutoConsume, + "AUTOPUBLISH": AutoPublish, + "PROTOBUF_NATIVE": ProtoNative, +} diff --git a/pulsar/schema.go b/pulsar/schema.go index 71136504..e56edf9f 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -26,6 +26,7 @@ import ( "sync" "unsafe" + "github.com/apache/pulsar-client-go/pulsar/internal" log "github.com/sirupsen/logrus" "github.com/hamba/avro/v2" @@ -35,30 +36,27 @@ import ( "google.golang.org/protobuf/types/descriptorpb" ) -type SchemaType int +type SchemaType internal.SchemaType const ( - NONE SchemaType = iota //No schema defined - STRING //Simple String encoding with UTF-8 - JSON //JSON object encoding and validation - PROTOBUF //Protobuf message encoding and decoding - AVRO //Serialize and deserialize via Avro - BOOLEAN // - INT8 //A 8-byte integer. - INT16 //A 16-byte integer. - INT32 //A 32-byte integer. - INT64 //A 64-byte integer. - FLOAT //A float number. - DOUBLE //A double number - _ // - _ // - _ // - KeyValue //A Schema that contains Key Schema and Value Schema. - BYTES = 0 //A bytes array. - AUTO = -2 // - AutoConsume = -3 //Auto Consume Type. - AutoPublish = -4 // Auto Publish Type. - ProtoNative = 20 //Protobuf native message encoding and decoding + NONE = SchemaType(internal.NONE) + STRING = SchemaType(internal.STRING) + JSON = SchemaType(internal.JSON) + PROTOBUF = SchemaType(internal.PROTOBUF) + AVRO = SchemaType(internal.AVRO) + BOOLEAN = SchemaType(internal.BOOLEAN) + INT8 = SchemaType(internal.INT8) + INT16 = SchemaType(internal.INT16) + INT32 = SchemaType(internal.INT32) + INT64 = SchemaType(internal.INT64) + FLOAT = SchemaType(internal.FLOAT) + DOUBLE = SchemaType(internal.DOUBLE) + KeyValue = SchemaType(internal.KeyValue) + BYTES = SchemaType(internal.BYTES) + AUTO = SchemaType(internal.AUTO) + AutoConsume = SchemaType(internal.AutoConsume) + AutoPublish = SchemaType(internal.AutoPublish) + ProtoNative = SchemaType(internal.ProtoNative) ) // Encapsulates data around the schema definition diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go index 34216c47..37145a31 100644 --- a/pulsar/schema_test.go +++ b/pulsar/schema_test.go @@ -58,6 +58,18 @@ func createClient() Client { return client } +func createHTTPLookupClient() Client { + // create client + lookupURL := "http://localhost:8080" + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + log.Fatal(err) + } + return client +} + func TestBytesSchema(t *testing.T) { client := createClient() defer client.Close() @@ -167,6 +179,54 @@ func TestJsonSchema(t *testing.T) { defer consumer.Close() } +func TestHTTPLookupJsonSchema(t *testing.T) { + client := createHTTPLookupClient() + defer client.Close() + + properties := make(map[string]string) + properties["pulsar"] = "hello" + jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties) + producer1, err := client.CreateProducer(ProducerOptions{ + Topic: "httpLookupJsonTopic", + Schema: jsonSchemaWithProperties, + }) + assert.Nil(t, err) + + _, err = producer1.Send(context.Background(), &ProducerMessage{ + Value: &testJSON{ + ID: 100, + Name: "pulsar", + }, + }) + if err != nil { + log.Fatal(err) + } + producer1.Close() + + //create consumer + var s testJSON + + consumerJS, err := NewJSONSchemaWithValidation(exampleSchemaDef, nil) + assert.Nil(t, err) + assert.NotNil(t, consumerJS) + consumerJS = NewJSONSchema(exampleSchemaDef, nil) // test this legacy function + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: "httpLookupJsonTopic", + SubscriptionName: "sub-1", + Schema: consumerJS, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + err = msg.GetSchemaValue(&s) + assert.Nil(t, err) + assert.Equal(t, s.ID, 100) + assert.Equal(t, s.Name, "pulsar") + + defer consumer.Close() +} + func TestProtoSchema(t *testing.T) { client := createClient() defer client.Close() @@ -318,6 +378,50 @@ func TestAvroSchema(t *testing.T) { defer consumer.Close() } +func TestHTTPLookupAvroSchema(t *testing.T) { + client := createHTTPLookupClient() + defer client.Close() + + // create producer + asProducer, err := NewAvroSchemaWithValidation(exampleSchemaDef, nil) + assert.Nil(t, err) + assert.NotNil(t, asProducer) + asProducer = NewAvroSchema(exampleSchemaDef, nil) + producer, err := client.CreateProducer(ProducerOptions{ + Topic: "httpLookup-avro-topic", + Schema: asProducer, + }) + assert.Nil(t, err) + if _, err := producer.Send(context.Background(), &ProducerMessage{ + Value: testAvro{ + ID: 100, + Name: "pulsar", + }, + }); err != nil { + log.Fatal(err) + } + + //create consumer + unobj := testAvro{} + + asConsumer := NewAvroSchema(exampleSchemaDef, nil) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: "httpLookup-avro-topic", + SubscriptionName: "sub-1", + Schema: asConsumer, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + err = msg.GetSchemaValue(&unobj) + assert.Nil(t, err) + assert.Equal(t, unobj.ID, 100) + assert.Equal(t, unobj.Name, "pulsar") + defer consumer.Close() +} + func TestStringSchema(t *testing.T) { client := createClient() defer client.Close()