This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 884e13e  prevent consumer panic on de-serializing message if schema 
not found (#886)
884e13e is described below

commit 884e13ecf0a19059c914387c48f5bade57eb5207
Author: Garule Prabhudas <[email protected]>
AuthorDate: Tue Nov 29 14:32:43 2022 +0530

    prevent consumer panic on de-serializing message if schema not found (#886)
    
    * prevent consumer panic on de-serializing message if schema not found
    
    * lint fix
    
    Co-authored-by: PGarule <[email protected]>
---
 pulsar/consumer_partition.go |  5 ++++
 pulsar/producer_test.go      | 65 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index c54ef68..4149a91 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -221,6 +221,11 @@ func (s *schemaInfoCache) Get(schemaVersion []byte) 
(schema Schema, err error) {
                return nil, err
        }
 
+       if pbSchema == nil {
+               err = fmt.Errorf("schema not found for topic: [ %v ], schema 
version : [ %v ]", s.topic, schemaVersion)
+               return nil, err
+       }
+
        var properties = internal.ConvertToStringMap(pbSchema.Properties)
 
        schema, err = NewSchema(SchemaType(*pbSchema.Type), 
pbSchema.SchemaData, properties)
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2338074..75fc6db 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1640,3 +1640,68 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
                assert.Nil(t, err)
        }
 }
+
+func TestProducerWithSchemaAndConsumerSchemaNotFound(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       schema := NewAvroSchema(`{"fields":
+       [
+               
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+       ],
+       "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, 
nil)
+
+       topic := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  topic,
+               Schema: schema,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               SubscriptionName:            "my-sub-schema-not-found",
+               Type:                        Exclusive,
+               Schema:                      schema,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, consumer)
+
+       // each produced message will have schema version set in the message 
metadata
+       for i := 0; i < 5; i++ {
+               messageContent, err := schema.Encode(map[string]interface{}{
+                       "id": i,
+                       "name": map[string]interface{}{
+                               "string": "abc",
+                       },
+               })
+               assert.NoError(t, err)
+               _, err = producer.Send(context.Background(), &ProducerMessage{
+                       Payload: messageContent,
+               })
+               assert.NoError(t, err)
+       }
+
+       // delete schema of topic
+       topicSchemaDeleteURL := 
fmt.Sprintf("admin/v2/schemas/public/default/%v/schema", topic)
+       err = httpDelete(topicSchemaDeleteURL)
+       assert.NoError(t, err)
+
+       // consume message
+       msg, err := consumer.Receive(context.Background())
+       assert.NoError(t, err)
+       assert.NotNil(t, msg)
+
+       // try to serialize message payload
+       var v interface{}
+       err = msg.GetSchemaValue(&v)
+       // should fail with error but not panic
+       assert.Error(t, err)
+}

Reply via email to