cckellogg commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r702008258



##########
File path: pulsar/consumer_partition.go
##########
@@ -141,6 +142,63 @@ type partitionConsumer struct {
        providersMutex       sync.RWMutex
        compressionProviders map[pb.CompressionType]compression.Provider
        metrics              *internal.TopicMetrics
+       schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+       lock   sync.RWMutex
+       cache  map[string]*Schema
+       client *client
+       topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+       return &schemaInfoCache{
+               cache:  make(map[string]*Schema),
+               client: client,
+               topic:  topic,
+       }
+}
+
+func (s *schemaInfoCache) get(key string) (schema *Schema) {

Review comment:
       Let's remove the variable names here and below and return values from 
the functions. It makes the code more difficult to follow especially below 
since some returns have values and others do not.

##########
File path: pulsar/producer_partition.go
##########
@@ -361,19 +407,48 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        msg := request.msg
 
        payload := msg.Payload
+
        var schemaPayload []byte
        var err error
-       if p.options.Schema != nil {
-               schemaPayload, err = p.options.Schema.Encode(msg.Value)
+       if msg.Value != nil && msg.Payload != nil {
+               p.log.Error("Can not set Value and Payload both")
+               return
+       }
+
+       if p.options.DisableMultiSchema {
+               if msg.Schema == nil && p.options.Schema == nil && 
!msg.Schema.GetSchemaInfo().isSame((p.options.Schema).GetSchemaInfo()) {

Review comment:
       Can this be simplified or a comment added. I'm having a difficult time 
understanding.

##########
File path: pulsar/impl_message.go
##########
@@ -262,9 +264,20 @@ func (msg *message) GetReplicatedFrom() string {
 }
 
 func (msg *message) GetSchemaValue(v interface{}) error {
+       if msg.schemaVersion != nil {
+               schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+               if err != nil {
+                       return err
+               }
+               return (*schema).Decode(msg.payLoad, v)

Review comment:
       Why is casting needed here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to