This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 68cd09a96e82a44e11fe92b050c9674143f1e5cc
Author: Peter Hull <[email protected]>
AuthorDate: Tue Feb 20 02:31:00 2024 +0000

    [Fix] Fix Bytes Schema (#1173)
    
    (cherry picked from commit c2ca7e81f0c609cd8fb7b13695664519e63e4501)
---
 pulsar/schema.go          |  2 ++
 pulsar/schema_test.go     | 64 +++++++++++++++++++++++++++++++++++++++++++++++
 pulsar/table_view_test.go |  7 ++++++
 3 files changed, 73 insertions(+)

diff --git a/pulsar/schema.go b/pulsar/schema.go
index fd9d412d..3427fb26 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, 
properties map[string]s
        var schemaDef = string(schemaData)
        var s Schema
        switch schemaType {
+       case BYTES:
+               s = NewBytesSchema(properties)
        case STRING:
                s = NewStringSchema(properties)
        case JSON:
diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go
index c2008f6d..34216c47 100644
--- a/pulsar/schema_test.go
+++ b/pulsar/schema_test.go
@@ -19,11 +19,14 @@ package pulsar
 
 import (
        "context"
+       "fmt"
        "log"
        "testing"
+       "time"
 
        pb "github.com/apache/pulsar-client-go/integration-tests/pb"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 type testJSON struct {
@@ -55,6 +58,67 @@ func createClient() Client {
        return client
 }
 
+func TestBytesSchema(t *testing.T) {
+       client := createClient()
+       defer client.Close()
+
+       topic := newTopicName()
+
+       properties := make(map[string]string)
+       properties["pulsar"] = "hello"
+       producerSchemaBytes := NewBytesSchema(properties)
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  topic,
+               Schema: producerSchemaBytes,
+       })
+       assert.NoError(t, err)
+
+       _, err = producer.Send(context.Background(), &ProducerMessage{
+               Value: []byte(`{"key": "value"}`),
+       })
+       require.NoError(t, err)
+       _, err = producer.Send(context.Background(), &ProducerMessage{
+               Value: []byte(`something else`),
+       })
+       require.NoError(t, err)
+       producer.Close()
+
+       // Create consumer
+       consumerSchemaBytes := NewBytesSchema(nil)
+       assert.NotNil(t, consumerSchemaBytes)
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               SubscriptionName:            "sub-1",
+               Schema:                      consumerSchemaBytes,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+
+       ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+       defer cancel()
+
+       // Receive first message
+       var out1 []byte
+       msg1, err := consumer.Receive(ctx)
+       assert.NoError(t, err)
+       err = msg1.GetSchemaValue(&out1)
+       assert.NoError(t, err)
+       assert.Equal(t, []byte(`{"key": "value"}`), out1)
+       consumer.Ack(msg1)
+       require.NoError(t, err)
+
+       // Receive second message
+       var out2 []byte
+       msg2, err := consumer.Receive(ctx)
+       fmt.Println(string(msg2.Payload()))
+       assert.NoError(t, err)
+       err = msg2.GetSchemaValue(&out2)
+       assert.NoError(t, err)
+       assert.Equal(t, []byte(`something else`), out2)
+
+       defer consumer.Close()
+}
+
 func TestJsonSchema(t *testing.T) {
        client := createClient()
        defer client.Close()
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
index 45b94411..2368e3d8 100644
--- a/pulsar/table_view_test.go
+++ b/pulsar/table_view_test.go
@@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) {
                expValueOut   interface{}
                valueCheck    func(t *testing.T, got interface{}) // Overrides 
expValueOut for more complex checks
        }{
+               {
+                       name:          "BytesSchema",
+                       schema:        NewBytesSchema(nil),
+                       schemaType:    []byte(`any`),
+                       producerValue: []byte(`hello pulsar`),
+                       expValueOut:   []byte(`hello pulsar`),
+               },
                {
                        name:          "StringSchema",
                        schema:        NewStringSchema(nil),

Reply via email to