This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 68cd09a96e82a44e11fe92b050c9674143f1e5cc Author: Peter Hull <[email protected]> AuthorDate: Tue Feb 20 02:31:00 2024 +0000 [Fix] Fix Bytes Schema (#1173) (cherry picked from commit c2ca7e81f0c609cd8fb7b13695664519e63e4501) --- pulsar/schema.go | 2 ++ pulsar/schema_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++ pulsar/table_view_test.go | 7 ++++++ 3 files changed, 73 insertions(+) diff --git a/pulsar/schema.go b/pulsar/schema.go index fd9d412d..3427fb26 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]s var schemaDef = string(schemaData) var s Schema switch schemaType { + case BYTES: + s = NewBytesSchema(properties) case STRING: s = NewStringSchema(properties) case JSON: diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go index c2008f6d..34216c47 100644 --- a/pulsar/schema_test.go +++ b/pulsar/schema_test.go @@ -19,11 +19,14 @@ package pulsar import ( "context" + "fmt" "log" "testing" + "time" pb "github.com/apache/pulsar-client-go/integration-tests/pb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testJSON struct { @@ -55,6 +58,67 @@ func createClient() Client { return client } +func TestBytesSchema(t *testing.T) { + client := createClient() + defer client.Close() + + topic := newTopicName() + + properties := make(map[string]string) + properties["pulsar"] = "hello" + producerSchemaBytes := NewBytesSchema(properties) + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: producerSchemaBytes, + }) + assert.NoError(t, err) + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`{"key": "value"}`), + }) + require.NoError(t, err) + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`something else`), + }) + require.NoError(t, err) + producer.Close() + + // Create consumer + consumerSchemaBytes := NewBytesSchema(nil) + assert.NotNil(t, consumerSchemaBytes) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Schema: consumerSchemaBytes, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Receive first message + var out1 []byte + msg1, err := consumer.Receive(ctx) + assert.NoError(t, err) + err = msg1.GetSchemaValue(&out1) + assert.NoError(t, err) + assert.Equal(t, []byte(`{"key": "value"}`), out1) + consumer.Ack(msg1) + require.NoError(t, err) + + // Receive second message + var out2 []byte + msg2, err := consumer.Receive(ctx) + fmt.Println(string(msg2.Payload())) + assert.NoError(t, err) + err = msg2.GetSchemaValue(&out2) + assert.NoError(t, err) + assert.Equal(t, []byte(`something else`), out2) + + defer consumer.Close() +} + func TestJsonSchema(t *testing.T) { client := createClient() defer client.Close() diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go index 45b94411..2368e3d8 100644 --- a/pulsar/table_view_test.go +++ b/pulsar/table_view_test.go @@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) { expValueOut interface{} valueCheck func(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks }{ + { + name: "BytesSchema", + schema: NewBytesSchema(nil), + schemaType: []byte(`any`), + producerValue: []byte(`hello pulsar`), + expValueOut: []byte(`hello pulsar`), + }, { name: "StringSchema", schema: NewStringSchema(nil),
