This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 884e13e prevent consumer panic on de-serializing message if schema
not found (#886)
884e13e is described below
commit 884e13ecf0a19059c914387c48f5bade57eb5207
Author: Garule Prabhudas <[email protected]>
AuthorDate: Tue Nov 29 14:32:43 2022 +0530
prevent consumer panic on de-serializing message if schema not found (#886)
* prevent consumer panic on de-serializing message if schema not found
* lint fix
Co-authored-by: PGarule <[email protected]>
---
pulsar/consumer_partition.go | 5 ++++
pulsar/producer_test.go | 65 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 70 insertions(+)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index c54ef68..4149a91 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -221,6 +221,11 @@ func (s *schemaInfoCache) Get(schemaVersion []byte)
(schema Schema, err error) {
return nil, err
}
+ if pbSchema == nil {
+ err = fmt.Errorf("schema not found for topic: [ %v ], schema
version : [ %v ]", s.topic, schemaVersion)
+ return nil, err
+ }
+
var properties = internal.ConvertToStringMap(pbSchema.Properties)
schema, err = NewSchema(SchemaType(*pbSchema.Type),
pbSchema.SchemaData, properties)
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2338074..75fc6db 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1640,3 +1640,68 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
assert.Nil(t, err)
}
}
+
+func TestProducerWithSchemaAndConsumerSchemaNotFound(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ schema := NewAvroSchema(`{"fields":
+ [
+
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+ ],
+ "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`,
nil)
+
+ topic := newTopicName()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: schema,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-schema-not-found",
+ Type: Exclusive,
+ Schema: schema,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, consumer)
+
+ // each produced message will have schema version set in the message
metadata
+ for i := 0; i < 5; i++ {
+ messageContent, err := schema.Encode(map[string]interface{}{
+ "id": i,
+ "name": map[string]interface{}{
+ "string": "abc",
+ },
+ })
+ assert.NoError(t, err)
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Payload: messageContent,
+ })
+ assert.NoError(t, err)
+ }
+
+ // delete schema of topic
+ topicSchemaDeleteURL :=
fmt.Sprintf("admin/v2/schemas/public/default/%v/schema", topic)
+ err = httpDelete(topicSchemaDeleteURL)
+ assert.NoError(t, err)
+
+ // consume message
+ msg, err := consumer.Receive(context.Background())
+ assert.NoError(t, err)
+ assert.NotNil(t, msg)
+
+ // try to serialize message payload
+ var v interface{}
+ err = msg.GetSchemaValue(&v)
+ // should fail with error but not panic
+ assert.Error(t, err)
+}