This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 12b966df [improve] Support http lookup getSchema interface (#1368)
12b966df is described below

commit 12b966df593ad6455ff9fb7023dd4b831c492f53
Author: zhou zhuohan <843520...@qq.com>
AuthorDate: Wed Jul 2 18:16:59 2025 +0800

    [improve] Support http lookup getSchema interface (#1368)
    
    Master Issue: 
https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema#changespart-1
    
    Related pr https://github.com/apache/pulsar-client-go/pull/611
    
    ### Motivation
    Currently pulsar go sdk has supported multi-version schema in above pr, but 
the pr does not support `getSchema()` method with http lookup service. So that 
we will encounter error when we call `msg.GetSchemaValue(v interface{}) error` 
function with http serviceUrl. Demo below:
    ```
    func createClient() Client {
            // create client
            //lookupURL := "pulsar://localhost:6650"
            lookupURL := "http://localhost:8080";   // change to http protocol 
serviceUrl
            client, err := NewClient(ClientOptions{
                    URL: lookupURL,
            })
            if err != nil {
                    log.Fatal(err)
            }
            return client
    }
    
    func TestBytesSchema(t *testing.T) {
            client := createClient()
            defer client.Close()
    
            topic := newTopicName()
    
            properties := make(map[string]string)
            properties["pulsar"] = "hello"
            producerSchemaBytes := NewBytesSchema(properties)
            producer, err := client.CreateProducer(ProducerOptions{
                    Topic:  topic,
                    Schema: producerSchemaBytes,
            })
            assert.NoError(t, err)
    
            _, err = producer.Send(context.Background(), &ProducerMessage{
                    Value: []byte(`{"key": "value"}`),
            })
            require.NoError(t, err)
            producer.Close()
    
            // Create consumer
            consumerSchemaBytes := NewBytesSchema(nil)
            assert.NotNil(t, consumerSchemaBytes)
            consumer, err := client.Subscribe(ConsumerOptions{
                    Topic:                       topic,
                    SubscriptionName:            "sub-1",
                    Schema:                      consumerSchemaBytes,
                    SubscriptionInitialPosition: SubscriptionPositionEarliest,
            })
            assert.Nil(t, err)
    
            ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
            defer cancel()
    
            // Receive first message
            var out1 []byte
            msg1, err := consumer.Receive(ctx)
            assert.NoError(t, err)
            err = msg1.GetSchemaValue(&out1)
            assert.NoError(t, err)
            assert.Equal(t, []byte(`{"key": "value"}`), out1)
            consumer.Ack(msg1)
            require.NoError(t, err)
    }
    ```
    Test output:
    ```
    /root/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.23.0.linux-amd64/bin/go 
tool test2json -t 
/root/.cache/JetBrains/GoLand2024.1/tmp/GoLand/___TestBytesSchema_in_github_com_apache_pulsar_client_go_pulsar.test
 -test.v -test.paniconexit0 -test.run ^\QTestBytesSchema\E$
    === RUN   TestBytesSchema
    time="2025-05-14T16:32:45+08:00" level=info msg="Connecting to broker" 
remote_addr="pulsar://localhost:6650"
    time="2025-05-14T16:32:45+08:00" level=info msg="TCP connection 
established" local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650"
    time="2025-05-14T16:32:45+08:00" level=info msg="Connection is ready" 
local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650"
    time="2025-05-14T16:32:45+08:00" level=info msg="Connected producer" 
cnx="127.0.0.1:36638 -> 127.0.0.1:6650" epoch=0 
topic="persistent://public/default/my-topic-147368803"
    time="2025-05-14T16:32:45+08:00" level=info msg="Created producer" 
cnx="127.0.0.1:36638 -> 127.0.0.1:6650" producerID=1 
producer_name=standalone-42-1 
topic="persistent://public/default/my-topic-147368803"
    time="2025-05-14T16:32:45+08:00" level=info msg="Closing producer" 
producerID=1 producer_name=standalone-42-1 
topic="persistent://public/default/my-topic-147368803"
    time="2025-05-14T16:32:45+08:00" level=info msg="Closed producer" 
producerID=1 producer_name=standalone-42-1 
topic="persistent://public/default/my-topic-147368803"
    time="2025-05-14T16:32:45+08:00" level=info msg="Connected consumer" 
consumerID=1 name=yzway subscription=sub-1 
topic="persistent://public/default/my-topic-147368803"
    time="2025-05-14T16:32:45+08:00" level=info msg="Created consumer" 
consumerID=1 name=yzway subscription=sub-1 
topic="persistent://public/default/my-topic-147368803"
        schema_test.go:101:
                    Error Trace:    
/data/code/dev/pulsar-client-go/pulsar/schema_test.go:101
                    Error:          Received unexpected error:
                                    GetSchema is not supported by 
httpLookupService
                    Test:           TestBytesSchema
        schema_test.go:102:
                    Error Trace:    
/data/code/dev/pulsar-client-go/pulsar/schema_test.go:102
                    Error:          Not equal:
                                    expected: []byte{0x7b, 0x22, 0x6b, 0x65, 
0x79, 0x22, 0x3a, 0x20, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d}
                                    actual  : []byte(nil)
    
                                    Diff:
                                    --- Expected
                                    +++ Actual
                                    @@ -1,4 +1,2 @@
                                    -([]uint8) (len=16) {
                                    - 00000000  7b 22 6b 65 79 22 3a 20  22 76 
61 6c 75 65 22 7d  |{"key": "value"}|
                                    -}
                                    +([]uint8) <nil>
    
                    Test:           TestBytesSchema
        schema_test.go:104:
                    Error Trace:    
/data/code/dev/pulsar-client-go/pulsar/schema_test.go:104
                    Error:          Received unexpected error:
                                    GetSchema is not supported by 
httpLookupService
                    Test:           TestBytesSchema
    time="2025-05-14T16:32:45+08:00" level=info msg="Closing consumer=1" 
consumerID=1 name=yzway subscription=sub-1 
topic="persistent://public/default/my-topic-147368803"
    time="2025-05-14T16:32:45+08:00" level=info msg="Closed consumer" 
consumerID=1 name=yzway subscription=sub-1 
topic="persistent://public/default/my-topic-147368803"
    --- FAIL: TestBytesSchema (0.12s)
    
    Expected :[]byte{0x7b, 0x22, 0x6b, 0x65, 0x79, 0x22, 0x3a, 0x20, 0x22, 
0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d}
    Actual   :[]byte(nil)
    ```
    
    ### Modifications
    - Update LookupService interface return type from `GetSchema(topic string, 
schemaVersion []byte) (schema *pb.Schema, err error)` to `GetSchema(topic 
string, schemaVersion []byte) (*LookupSchema, error)` to support http lookup 
protocol in `lookup_service.go`
    - Support HTTPLookupService `GetSchema(topic string, schemaVersion []byte) 
(*LookupSchema, error)` function in `lookup_service.go`
    - Add http lookup `GetSchema()` related test cases in schema_test.go
---
 pulsar/consumer_partition.go      |  20 ++++----
 pulsar/internal/lookup_service.go |  76 +++++++++++++++++++++++++---
 pulsar/internal/schema.go         |  67 ++++++++++++++++++++++++
 pulsar/schema.go                  |  42 ++++++++-------
 pulsar/schema_test.go             | 104 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 270 insertions(+), 39 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index fc2cb774..6a8cf83d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -318,22 +318,22 @@ func (s *schemaInfoCache) Get(schemaVersion []byte) 
(schema Schema, err error) {
                return schema, nil
        }
 
-       pbSchema, err := s.client.lookupService.GetSchema(s.topic, 
schemaVersion)
+       //      cache missed, try to use lookupService to find schema info
+       lookupSchema, err := s.client.lookupService.GetSchema(s.topic, 
schemaVersion)
        if err != nil {
                return nil, err
        }
-
-       if pbSchema == nil {
-               err = fmt.Errorf("schema not found for topic: [ %v ], schema 
version : [ %v ]", s.topic, schemaVersion)
-               return nil, err
-       }
-
-       var properties = internal.ConvertToStringMap(pbSchema.Properties)
-
-       schema, err = NewSchema(SchemaType(*pbSchema.Type), 
pbSchema.SchemaData, properties)
+       schema, err = NewSchema(
+               //      lookupSchema.SchemaType is internal package SchemaType 
type,
+               //      we need to cast it to pulsar.SchemaType as soon as we 
use it in current pulsar package
+               SchemaType(lookupSchema.SchemaType),
+               lookupSchema.Data,
+               lookupSchema.Properties,
+       )
        if err != nil {
                return nil, err
        }
+
        s.add(key, schema)
        return schema, nil
 }
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index fdbf9f52..19e51b41 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -18,9 +18,11 @@
 package internal
 
 import (
+       "encoding/binary"
        "errors"
        "fmt"
        "net/url"
+       "strings"
 
        "google.golang.org/protobuf/proto"
 
@@ -34,6 +36,13 @@ type LookupResult struct {
        PhysicalAddr *url.URL
 }
 
+// LookupSchema return lookup schema result
+type LookupSchema struct {
+       SchemaType SchemaType
+       Data       []byte
+       Properties map[string]string
+}
+
 // GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
 type GetTopicsOfNamespaceMode string
 
@@ -62,7 +71,7 @@ type LookupService interface {
        GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) 
([]string, error)
 
        // GetSchema returns schema for a given version.
-       GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err 
error)
+       GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)
 
        GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) 
(*LookupResult, error)
 
@@ -97,7 +106,7 @@ func NewLookupService(rpcClient RPCClient, serviceURL 
*url.URL, serviceNameResol
        }
 }
 
-func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema 
*pb.Schema, err error) {
+func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) 
(*LookupSchema, error) {
        id := ls.rpcClient.NewRequestID()
        req := &pb.CommandGetSchema{
                RequestId:     proto.Uint64(id),
@@ -106,12 +115,23 @@ func (ls *lookupService) GetSchema(topic string, 
schemaVersion []byte) (schema *
        }
        res, err := ls.rpcClient.RequestToAnyBroker(id, 
pb.BaseCommand_GET_SCHEMA, req)
        if err != nil {
-               return nil, err
+               return &LookupSchema{}, err
        }
        if res.Response.Error != nil {
-               return nil, errors.New(res.Response.GetError().String())
+               return &LookupSchema{}, 
errors.New(res.Response.GetError().String())
+       }
+
+       //      deserialize pbSchema and convert it to LookupSchema struct
+       pbSchema := res.Response.GetSchemaResponse.Schema
+       if pbSchema == nil {
+               err = fmt.Errorf("schema not found for topic: [ %v ], schema 
version : [ %v ]", topic, schemaVersion)
+               return &LookupSchema{}, err
        }
-       return res.Response.GetSchemaResponse.Schema, nil
+       return &LookupSchema{
+               SchemaType: SchemaType(int(*pbSchema.Type)),
+               Data:       pbSchema.SchemaData,
+               Properties: ConvertToStringMap(pbSchema.Properties),
+       }, nil
 }
 
 func (ls *lookupService) GetBrokerAddress(brokerServiceURL string, 
proxyThroughServiceURL bool) (*LookupResult, error) {
@@ -273,6 +293,8 @@ const HTTPAdminServiceV1Format string = 
"/admin/%s/partitions"
 const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
 const HTTPTopicUnderNamespaceV1 string = 
"/admin/namespaces/%s/destinations?mode=%s"
 const HTTPTopicUnderNamespaceV2 string = 
"/admin/v2/namespaces/%s/topics?mode=%s"
+const HTTPSchemaV2 string = "/admin/v2/schemas/%s/schema"
+const HTTPSchemaWithVersionV2 string = "/admin/v2/schemas/%s/schema/%d"
 
 type httpLookupData struct {
        BrokerURL    string `json:"brokerUrl"`
@@ -289,6 +311,12 @@ type httpLookupService struct {
        metrics             *Metrics
 }
 
+type httpLookupSchema struct {
+       HTTPSchemaType string            `json:"type"`
+       Data           string            `json:"data"`
+       Properties     map[string]string `json:"properties"`
+}
+
 func (h *httpLookupService) GetBrokerAddress(brokerServiceURL string, _ bool) 
(*LookupResult, error) {
        logicalAddress, err := url.ParseRequestURI(brokerServiceURL)
        if err != nil {
@@ -371,8 +399,42 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace 
string, mode GetTopic
        return topics, nil
 }
 
-func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema, 
err error) {
-       return nil, errors.New("GetSchema is not supported by 
httpLookupService")
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) 
(*LookupSchema, error) {
+       topicName, err := ParseTopicName(topic)
+       if err != nil {
+               return nil, err
+       }
+       topicRestPath := fmt.Sprintf("%s/%s", topicName.Namespace, 
topicName.Topic)
+       var path string
+       if schemaVersion != nil {
+               path = fmt.Sprintf(HTTPSchemaWithVersionV2, topicRestPath, 
int64(binary.BigEndian.Uint64(schemaVersion)))
+       } else {
+               path = fmt.Sprintf(HTTPSchemaV2, topicRestPath)
+       }
+       lookupSchema := &httpLookupSchema{}
+       if err := h.httpClient.Get(path, &lookupSchema, nil); err != nil {
+               if strings.HasPrefix(err.Error(), "Code: 404") {
+                       err = fmt.Errorf("schema not found for topic: [ %v ], 
schema version : [ %v ]", topic, schemaVersion)
+               }
+               h.log.Errorf("schema [ %v ] request error, schema version : [ 
%v ]", topic, schemaVersion)
+               return &LookupSchema{}, err
+       }
+
+       //      deserialize httpSchema and convert it to LookupSchema struct
+       schemaType, exists := 
HTTPSchemaTypeMap[strings.ToUpper(lookupSchema.HTTPSchemaType)]
+       if !exists {
+               err = fmt.Errorf("unsupported schema type [%s] for topic: [ %v 
], schema version : [ %v ]",
+                       lookupSchema.HTTPSchemaType,
+                       topic,
+                       schemaVersion,
+               )
+               return nil, err
+       }
+       return &LookupSchema{
+               SchemaType: schemaType,
+               Data:       []byte(lookupSchema.Data),
+               Properties: lookupSchema.Properties,
+       }, nil
 }
 
 func (h *httpLookupService) ServiceNameResolver() *ServiceNameResolver {
diff --git a/pulsar/internal/schema.go b/pulsar/internal/schema.go
new file mode 100644
index 00000000..4b9f84cb
--- /dev/null
+++ b/pulsar/internal/schema.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+// SchemaType We need to define a SchemaType in this internal package, to 
avoid directly importing pulsar.SchemaType.
+// In case we might encounter importing cycle problem.
+type SchemaType int
+
+const (
+       NONE        SchemaType = iota //No schema defined
+       STRING                        //Simple String encoding with UTF-8
+       JSON                          //JSON object encoding and validation
+       PROTOBUF                      //Protobuf message encoding and decoding
+       AVRO                          //Serialize and deserialize via Avro
+       BOOLEAN                       //
+       INT8                          //A 8-byte integer.
+       INT16                         //A 16-byte integer.
+       INT32                         //A 32-byte integer.
+       INT64                         //A 64-byte integer.
+       FLOAT                         //A float number.
+       DOUBLE                        //A double number
+       _                             //
+       _                             //
+       _                             //
+       KeyValue                      //A Schema that contains Key Schema and 
Value Schema.
+       BYTES       = 0               //A bytes array.
+       AUTO        = -2              //
+       AutoConsume = -3              //Auto Consume Type.
+       AutoPublish = -4              //Auto Publish Type.
+       ProtoNative = 20              //Protobuf native message encoding and 
decoding
+)
+
+var HTTPSchemaTypeMap = map[string]SchemaType{
+       "NONE":            BYTES,
+       "STRING":          STRING,
+       "JSON":            JSON,
+       "PROTOBUF":        PROTOBUF,
+       "AVRO":            AVRO,
+       "BOOLEAN":         BOOLEAN,
+       "INT8":            INT8,
+       "INT16":           INT16,
+       "INT32":           INT32,
+       "INT64":           INT64,
+       "FLOAT":           FLOAT,
+       "DOUBLE":          DOUBLE,
+       "KEYVALUE":        KeyValue,
+       "BYTES":           BYTES,
+       "AUTO":            AUTO,
+       "AUTOCONSUME":     AutoConsume,
+       "AUTOPUBLISH":     AutoPublish,
+       "PROTOBUF_NATIVE": ProtoNative,
+}
diff --git a/pulsar/schema.go b/pulsar/schema.go
index 71136504..e56edf9f 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -26,6 +26,7 @@ import (
        "sync"
        "unsafe"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
        log "github.com/sirupsen/logrus"
 
        "github.com/hamba/avro/v2"
@@ -35,30 +36,27 @@ import (
        "google.golang.org/protobuf/types/descriptorpb"
 )
 
-type SchemaType int
+type SchemaType internal.SchemaType
 
 const (
-       NONE        SchemaType = iota //No schema defined
-       STRING                        //Simple String encoding with UTF-8
-       JSON                          //JSON object encoding and validation
-       PROTOBUF                      //Protobuf message encoding and decoding
-       AVRO                          //Serialize and deserialize via Avro
-       BOOLEAN                       //
-       INT8                          //A 8-byte integer.
-       INT16                         //A 16-byte integer.
-       INT32                         //A 32-byte integer.
-       INT64                         //A 64-byte integer.
-       FLOAT                         //A float number.
-       DOUBLE                        //A double number
-       _                             //
-       _                             //
-       _                             //
-       KeyValue                      //A Schema that contains Key Schema and 
Value Schema.
-       BYTES       = 0               //A bytes array.
-       AUTO        = -2              //
-       AutoConsume = -3              //Auto Consume Type.
-       AutoPublish = -4              // Auto Publish Type.
-       ProtoNative = 20              //Protobuf native message encoding and 
decoding
+       NONE        = SchemaType(internal.NONE)
+       STRING      = SchemaType(internal.STRING)
+       JSON        = SchemaType(internal.JSON)
+       PROTOBUF    = SchemaType(internal.PROTOBUF)
+       AVRO        = SchemaType(internal.AVRO)
+       BOOLEAN     = SchemaType(internal.BOOLEAN)
+       INT8        = SchemaType(internal.INT8)
+       INT16       = SchemaType(internal.INT16)
+       INT32       = SchemaType(internal.INT32)
+       INT64       = SchemaType(internal.INT64)
+       FLOAT       = SchemaType(internal.FLOAT)
+       DOUBLE      = SchemaType(internal.DOUBLE)
+       KeyValue    = SchemaType(internal.KeyValue)
+       BYTES       = SchemaType(internal.BYTES)
+       AUTO        = SchemaType(internal.AUTO)
+       AutoConsume = SchemaType(internal.AutoConsume)
+       AutoPublish = SchemaType(internal.AutoPublish)
+       ProtoNative = SchemaType(internal.ProtoNative)
 )
 
 // Encapsulates data around the schema definition
diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go
index 34216c47..37145a31 100644
--- a/pulsar/schema_test.go
+++ b/pulsar/schema_test.go
@@ -58,6 +58,18 @@ func createClient() Client {
        return client
 }
 
+func createHTTPLookupClient() Client {
+       // create client
+       lookupURL := "http://localhost:8080";
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+       return client
+}
+
 func TestBytesSchema(t *testing.T) {
        client := createClient()
        defer client.Close()
@@ -167,6 +179,54 @@ func TestJsonSchema(t *testing.T) {
        defer consumer.Close()
 }
 
+func TestHTTPLookupJsonSchema(t *testing.T) {
+       client := createHTTPLookupClient()
+       defer client.Close()
+
+       properties := make(map[string]string)
+       properties["pulsar"] = "hello"
+       jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic:  "httpLookupJsonTopic",
+               Schema: jsonSchemaWithProperties,
+       })
+       assert.Nil(t, err)
+
+       _, err = producer1.Send(context.Background(), &ProducerMessage{
+               Value: &testJSON{
+                       ID:   100,
+                       Name: "pulsar",
+               },
+       })
+       if err != nil {
+               log.Fatal(err)
+       }
+       producer1.Close()
+
+       //create consumer
+       var s testJSON
+
+       consumerJS, err := NewJSONSchemaWithValidation(exampleSchemaDef, nil)
+       assert.Nil(t, err)
+       assert.NotNil(t, consumerJS)
+       consumerJS = NewJSONSchema(exampleSchemaDef, nil) // test this legacy 
function
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       "httpLookupJsonTopic",
+               SubscriptionName:            "sub-1",
+               Schema:                      consumerJS,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetSchemaValue(&s)
+       assert.Nil(t, err)
+       assert.Equal(t, s.ID, 100)
+       assert.Equal(t, s.Name, "pulsar")
+
+       defer consumer.Close()
+}
+
 func TestProtoSchema(t *testing.T) {
        client := createClient()
        defer client.Close()
@@ -318,6 +378,50 @@ func TestAvroSchema(t *testing.T) {
        defer consumer.Close()
 }
 
+func TestHTTPLookupAvroSchema(t *testing.T) {
+       client := createHTTPLookupClient()
+       defer client.Close()
+
+       // create producer
+       asProducer, err := NewAvroSchemaWithValidation(exampleSchemaDef, nil)
+       assert.Nil(t, err)
+       assert.NotNil(t, asProducer)
+       asProducer = NewAvroSchema(exampleSchemaDef, nil)
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  "httpLookup-avro-topic",
+               Schema: asProducer,
+       })
+       assert.Nil(t, err)
+       if _, err := producer.Send(context.Background(), &ProducerMessage{
+               Value: testAvro{
+                       ID:   100,
+                       Name: "pulsar",
+               },
+       }); err != nil {
+               log.Fatal(err)
+       }
+
+       //create consumer
+       unobj := testAvro{}
+
+       asConsumer := NewAvroSchema(exampleSchemaDef, nil)
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       "httpLookup-avro-topic",
+               SubscriptionName:            "sub-1",
+               Schema:                      asConsumer,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = msg.GetSchemaValue(&unobj)
+       assert.Nil(t, err)
+       assert.Equal(t, unobj.ID, 100)
+       assert.Equal(t, unobj.Name, "pulsar")
+       defer consumer.Close()
+}
+
 func TestStringSchema(t *testing.T) {
        client := createClient()
        defer client.Close()

Reply via email to