This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c2ca7e81 [Fix] Fix Bytes Schema (#1173)
c2ca7e81 is described below
commit c2ca7e81f0c609cd8fb7b13695664519e63e4501
Author: Peter Hull <[email protected]>
AuthorDate: Tue Feb 20 02:31:00 2024 +0000
[Fix] Fix Bytes Schema (#1173)
---
pulsar/schema.go | 2 ++
pulsar/schema_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++
pulsar/table_view_test.go | 7 ++++++
3 files changed, 73 insertions(+)
diff --git a/pulsar/schema.go b/pulsar/schema.go
index fd9d412d..3427fb26 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte,
properties map[string]s
var schemaDef = string(schemaData)
var s Schema
switch schemaType {
+ case BYTES:
+ s = NewBytesSchema(properties)
case STRING:
s = NewStringSchema(properties)
case JSON:
diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go
index c2008f6d..34216c47 100644
--- a/pulsar/schema_test.go
+++ b/pulsar/schema_test.go
@@ -19,11 +19,14 @@ package pulsar
import (
"context"
+ "fmt"
"log"
"testing"
+ "time"
pb "github.com/apache/pulsar-client-go/integration-tests/pb"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
type testJSON struct {
@@ -55,6 +58,67 @@ func createClient() Client {
return client
}
+func TestBytesSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ topic := newTopicName()
+
+ properties := make(map[string]string)
+ properties["pulsar"] = "hello"
+ producerSchemaBytes := NewBytesSchema(properties)
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: producerSchemaBytes,
+ })
+ assert.NoError(t, err)
+
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Value: []byte(`{"key": "value"}`),
+ })
+ require.NoError(t, err)
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Value: []byte(`something else`),
+ })
+ require.NoError(t, err)
+ producer.Close()
+
+ // Create consumer
+ consumerSchemaBytes := NewBytesSchema(nil)
+ assert.NotNil(t, consumerSchemaBytes)
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "sub-1",
+ Schema: consumerSchemaBytes,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+ defer cancel()
+
+ // Receive first message
+ var out1 []byte
+ msg1, err := consumer.Receive(ctx)
+ assert.NoError(t, err)
+ err = msg1.GetSchemaValue(&out1)
+ assert.NoError(t, err)
+ assert.Equal(t, []byte(`{"key": "value"}`), out1)
+ consumer.Ack(msg1)
+ require.NoError(t, err)
+
+ // Receive second message
+ var out2 []byte
+ msg2, err := consumer.Receive(ctx)
+ fmt.Println(string(msg2.Payload()))
+ assert.NoError(t, err)
+ err = msg2.GetSchemaValue(&out2)
+ assert.NoError(t, err)
+ assert.Equal(t, []byte(`something else`), out2)
+
+ defer consumer.Close()
+}
+
func TestJsonSchema(t *testing.T) {
client := createClient()
defer client.Close()
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
index 45b94411..2368e3d8 100644
--- a/pulsar/table_view_test.go
+++ b/pulsar/table_view_test.go
@@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) {
expValueOut interface{}
valueCheck func(t *testing.T, got interface{}) // Overrides
expValueOut for more complex checks
}{
+ {
+ name: "BytesSchema",
+ schema: NewBytesSchema(nil),
+ schemaType: []byte(`any`),
+ producerValue: []byte(`hello pulsar`),
+ expValueOut: []byte(`hello pulsar`),
+ },
{
name: "StringSchema",
schema: NewStringSchema(nil),