This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d4574424 [Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
d4574424 is described below

commit d457442434c33f0fef7469c24d25de04bc718aa6
Author: Oliver Muir <[email protected]>
AuthorDate: Mon Nov 20 09:41:50 2023 +0000

    [Issue 1132] Fix JSONSchema unmarshalling in TableView (#1133)
    
    Fixes #1132
    
    ### Motivation
    Fix issue #1132 - using JSONSchema with TableView
    
    ### Modifications
    
    - Set a concrete type in the `payload` variable before JSON-unmarshalling 
into that variable. This allows the JSON package to identify and use the type 
rather than seeing it as `interface{}`.
    - Use `reflect.Indirect(payload).Interface()` when storing the payload and 
passing it to listeners to remove the pointer from `reflect.New`.
    - Add test coverage for `TableView.Get` covering all supported schema types.
    - Add test coverage for `TableView.ForEachAndListen` for JSONSchema.
    
    Additional minor changes. They didn't seem worth their own MRs but I'm 
happy to split them out if that's better.
    - Correct typo in comments on `TableView.ForEach` and 
`TableView.ForEachAndListen` interface methods.
    - Correct `TableView.ForEachAndListen` comment to clarify that it continues 
to call the given action on future messages.
    - Correct formatting directive (`%w` -> `%v`) in error log 
`tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)`. 
(This indirectly calls `fmt.Sprintf` in logrus which doesn't support `%w`).
---
 pulsar/table_view.go      |   6 +-
 pulsar/table_view_impl.go |  11 ++-
 pulsar/table_view_test.go | 211 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 219 insertions(+), 9 deletions(-)

diff --git a/pulsar/table_view.go b/pulsar/table_view.go
index e566bf0b..58a664ae 100644
--- a/pulsar/table_view.go
+++ b/pulsar/table_view.go
@@ -65,12 +65,12 @@ type TableView interface {
        // Keys returns a slice of the keys contained in this TableView.
        Keys() []string
 
-       // ForEach performs the give action for each entry in this map until 
all entries have been processed or the action
+       // ForEach performs the given action for each entry in this map until 
all entries have been processed or the action
        // returns an error.
        ForEach(func(string, interface{}) error) error
 
-       // ForEachAndListen performs the give action for each entry in this map 
until all entries have been processed or
-       // the action returns an error.
+       // ForEachAndListen performs the given action for each entry in this 
map until all entries have been processed or
+       // the action returns an error.  The given action will then be 
performed on each new entry in this map.
        ForEachAndListen(func(string, interface{}) error) error
 
        // Close closes the table view and releases resources allocated.
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
index 47f8c6c0..17e0b90f 100644
--- a/pulsar/table_view_impl.go
+++ b/pulsar/table_view_impl.go
@@ -245,19 +245,18 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
        tv.dataMu.Lock()
        defer tv.dataMu.Unlock()
 
-       var payload interface{}
+       payload := reflect.New(tv.options.SchemaValueType)
        if len(msg.Payload()) == 0 {
                delete(tv.data, msg.Key())
        } else {
-               payload = 
reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
-               if err := msg.GetSchemaValue(&payload); err != nil {
-                       tv.logger.Errorf("msg.GetSchemaValue() failed with %w; 
msg is %v", err, msg)
+               if err := msg.GetSchemaValue(payload.Interface()); err != nil {
+                       tv.logger.Errorf("msg.GetSchemaValue() failed with %v; 
msg is %v", err, msg)
                }
-               tv.data[msg.Key()] = payload
+               tv.data[msg.Key()] = reflect.Indirect(payload).Interface()
        }
 
        for _, listener := range tv.listeners {
-               if err := listener(msg.Key(), payload); err != nil {
+               if err := listener(msg.Key(), 
reflect.Indirect(payload).Interface()); err != nil {
                        tv.logger.Errorf("table view listener failed for %v: 
%w", msg, err)
                }
        }
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
index d29b24d2..45b94411 100644
--- a/pulsar/table_view_test.go
+++ b/pulsar/table_view_test.go
@@ -24,6 +24,7 @@ import (
        "testing"
        "time"
 
+       pb "github.com/apache/pulsar-client-go/integration-tests/pb"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
@@ -80,6 +81,157 @@ func TestTableView(t *testing.T) {
        }
 }
 
+func TestTableViewSchemas(t *testing.T) {
+       var tests = []struct {
+               name          string
+               schema        Schema
+               schemaType    interface{}
+               producerValue interface{}
+               expValueOut   interface{}
+               valueCheck    func(t *testing.T, got interface{}) // Overrides 
expValueOut for more complex checks
+       }{
+               {
+                       name:          "StringSchema",
+                       schema:        NewStringSchema(nil),
+                       schemaType:    strPointer("hello pulsar"),
+                       producerValue: "hello pulsar",
+                       expValueOut:   strPointer("hello pulsar"),
+               },
+               {
+                       name:          "JSONSchema",
+                       schema:        NewJSONSchema(exampleSchemaDef, nil),
+                       schemaType:    testJSON{},
+                       producerValue: testJSON{ID: 1, Name: "Pulsar"},
+                       expValueOut:   testJSON{ID: 1, Name: "Pulsar"},
+               },
+               {
+                       name:          "JSONSchema pointer type",
+                       schema:        NewJSONSchema(exampleSchemaDef, nil),
+                       schemaType:    &testJSON{ID: 1, Name: "Pulsar"},
+                       producerValue: testJSON{ID: 1, Name: "Pulsar"},
+                       expValueOut:   &testJSON{ID: 1, Name: "Pulsar"},
+               },
+               {
+                       name:          "AvroSchema",
+                       schema:        NewAvroSchema(exampleSchemaDef, nil),
+                       schemaType:    testAvro{ID: 1, Name: "Pulsar"},
+                       producerValue: testAvro{ID: 1, Name: "Pulsar"},
+                       expValueOut:   testAvro{ID: 1, Name: "Pulsar"},
+               },
+               {
+                       name:          "Int8Schema",
+                       schema:        NewInt8Schema(nil),
+                       schemaType:    int8(0),
+                       producerValue: int8(1),
+                       expValueOut:   int8(1),
+               },
+               {
+                       name:          "Int16Schema",
+                       schema:        NewInt16Schema(nil),
+                       schemaType:    int16(0),
+                       producerValue: int16(1),
+                       expValueOut:   int16(1),
+               },
+               {
+                       name:          "Int32Schema",
+                       schema:        NewInt32Schema(nil),
+                       schemaType:    int32(0),
+                       producerValue: int32(1),
+                       expValueOut:   int32(1),
+               },
+               {
+                       name:          "Int64Schema",
+                       schema:        NewInt64Schema(nil),
+                       schemaType:    int64(0),
+                       producerValue: int64(1),
+                       expValueOut:   int64(1),
+               },
+               {
+                       name:          "FloatSchema",
+                       schema:        NewFloatSchema(nil),
+                       schemaType:    float32(0),
+                       producerValue: float32(1),
+                       expValueOut:   float32(1),
+               },
+               {
+                       name:          "DoubleSchema",
+                       schema:        NewDoubleSchema(nil),
+                       schemaType:    float64(0),
+                       producerValue: float64(1),
+                       expValueOut:   float64(1),
+               },
+               {
+                       name:          "ProtoSchema",
+                       schema:        NewProtoSchema(protoSchemaDef, nil),
+                       schemaType:    pb.Test{},
+                       producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
+                       valueCheck: func(t *testing.T, got interface{}) {
+                               assert.IsType(t, pb.Test{}, got)
+                               assert.Equal(t, int32(1), got.(pb.Test).Num)
+                               assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
+                       },
+               },
+               {
+                       name:          "ProtoNativeSchema",
+                       schema:        
NewProtoNativeSchemaWithMessage(&pb.Test{}, nil),
+                       schemaType:    pb.Test{},
+                       producerValue: &pb.Test{Num: 1, Msf: "Pulsar"},
+                       valueCheck: func(t *testing.T, got interface{}) {
+                               assert.IsType(t, pb.Test{}, got)
+                               assert.Equal(t, int32(1), got.(pb.Test).Num)
+                               assert.Equal(t, "Pulsar", got.(pb.Test).Msf)
+                       },
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       client, err := NewClient(ClientOptions{
+                               URL: lookupURL,
+                       })
+
+                       assert.NoError(t, err)
+                       defer client.Close()
+
+                       topic := newTopicName()
+
+                       // create producer
+                       producer, err := client.CreateProducer(ProducerOptions{
+                               Topic:  topic,
+                               Schema: test.schema,
+                       })
+                       assert.NoError(t, err)
+                       defer producer.Close()
+
+                       _, err = producer.Send(context.Background(), 
&ProducerMessage{
+                               Key:   "testKey",
+                               Value: test.producerValue,
+                       })
+                       assert.NoError(t, err)
+
+                       // create table view
+                       tv, err := client.CreateTableView(TableViewOptions{
+                               Topic:           topic,
+                               Schema:          test.schema,
+                               SchemaValueType: 
reflect.TypeOf(test.schemaType),
+                       })
+                       assert.NoError(t, err)
+                       defer tv.Close()
+
+                       value := tv.Get("testKey")
+                       if test.valueCheck != nil {
+                               test.valueCheck(t, value)
+                       } else {
+                               assert.IsType(t, test.expValueOut, value)
+                               assert.Equal(t, test.expValueOut, value)
+                       }
+               })
+       }
+}
+
+func strPointer(s string) *string {
+       return &s
+}
+
 func TestPublishNilValue(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
@@ -143,3 +295,62 @@ func TestPublishNilValue(t *testing.T) {
 
        assert.Equal(t, *(tv.Get("key-2").(*string)), "value-2")
 }
+
+func TestForEachAndListenJSONSchema(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       schema := NewJSONSchema(exampleSchemaDef, nil)
+
+       // create table view
+       tv, err := client.CreateTableView(TableViewOptions{
+               Topic:           topic,
+               Schema:          schema,
+               SchemaValueType: reflect.TypeOf(testJSON{}),
+       })
+       assert.NoError(t, err)
+       defer tv.Close()
+
+       // create listener
+       valuePrefix := "hello pulsar: "
+       tv.ForEachAndListen(func(key string, value interface{}) error {
+               t.Log("foreach" + key)
+               s, ok := value.(testJSON)
+               assert.Truef(t, ok, "expected value to be testJSON type got 
%T", value)
+               assert.Equal(t, fmt.Sprintf(valuePrefix+key), s.Name)
+               return nil
+       })
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  topic,
+               Schema: schema,
+       })
+       assert.NoError(t, err)
+       defer producer.Close()
+
+       numMsg := 10
+       for i := 0; i < numMsg; i++ {
+               key := fmt.Sprintf("%d", i)
+               t.Log("producing" + key)
+               _, err = producer.Send(context.Background(), &ProducerMessage{
+                       Key: key,
+                       Value: testJSON{
+                               ID:   i,
+                               Name: fmt.Sprintf(valuePrefix + key),
+                       },
+               })
+               assert.NoError(t, err)
+       }
+
+       // Wait until tv receives all messages
+       for tv.Size() < 10 {
+               time.Sleep(time.Second * 1)
+               t.Logf("TableView number of elements: %d", tv.Size())
+       }
+}

Reply via email to