zzzming commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r787775362



##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
        dlq         *dlqRouter
 
        log log.Logger
-
+  providersMutex       sync.RWMutex
        compressionProviders sync.Map 
//map[pb.CompressionType]compression.Provider
        metrics              *internal.LeveledMetrics
        decryptor            cryptointernal.Decryptor
+       schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+       lock   sync.RWMutex
+       cache  map[string]Schema
+       client *client
+       topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+       return &schemaInfoCache{
+               cache:  make(map[string]Schema),
+               client: client,
+               topic:  topic,
+       }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) 
{
+       key := hex.EncodeToString(schemaVersion)
+       s.lock.RLock()
+       schema = s.cache[key]
+       s.lock.RUnlock()
+       if schema != nil {
+               return

Review comment:
       return schema nil

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
        dlq         *dlqRouter
 
        log log.Logger
-
+  providersMutex       sync.RWMutex
        compressionProviders sync.Map 
//map[pb.CompressionType]compression.Provider
        metrics              *internal.LeveledMetrics
        decryptor            cryptointernal.Decryptor
+       schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+       lock   sync.RWMutex
+       cache  map[string]Schema
+       client *client
+       topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+       return &schemaInfoCache{
+               cache:  make(map[string]Schema),
+               client: client,
+               topic:  topic,
+       }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) 
{
+       key := hex.EncodeToString(schemaVersion)
+       s.lock.RLock()
+       schema = s.cache[key]
+       s.lock.RUnlock()
+       if schema != nil {
+               return
+       }
+
+       pbSchema, err := s.client.lookupService.GetSchema(s.topic, 
schemaVersion)
+       if err != nil {
+               return nil, err
+       }
+
+       var properties = make(map[string]string)
+       if pbSchema.Properties != nil {
+               for _, entry := range pbSchema.Properties {
+                       properties[*entry.Key] = properties[*entry.Value]

Review comment:
       since Key and Value are both declared as *string, do you need check the 
reference is not nil before assigning them to properties?

##########
File path: pulsar/producer_partition.go
##########
@@ -397,19 +460,48 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        msg := request.msg
 
        payload := msg.Payload
+
        var schemaPayload []byte
        var err error
-       if p.options.Schema != nil {
-               schemaPayload, err = p.options.Schema.Encode(msg.Value)
+       if msg.Value != nil && msg.Payload != nil {
+               p.log.Error("Can not set Value and Payload both")
+               return
+       }
+
+       if p.options.DisableMultiSchema {
+               if msg.Schema != p.options.Schema {

Review comment:
       I am not sure if we want to compare the pointer or the actual value. 
Shema is an interface. So the current evaluation just make sure they are 
pointing to the same reference. Do you want to compare the value of Schema?

##########
File path: pulsar/internal/lookup_service.go
##########
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace 
string, mode GetTopic
        return topics, nil
 }
 
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) 
(schema *pb.Schema, err error) {
+       return nil, errors.New("not support")

Review comment:
       Can we make this more descriptive? Maybe an error like this helps to 
debug.
   errors.New("GetSchema is supported by httpLookupService")
   

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
        dlq         *dlqRouter
 
        log log.Logger
-
+  providersMutex       sync.RWMutex
        compressionProviders sync.Map 
//map[pb.CompressionType]compression.Provider
        metrics              *internal.LeveledMetrics
        decryptor            cryptointernal.Decryptor
+       schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+       lock   sync.RWMutex
+       cache  map[string]Schema
+       client *client
+       topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+       return &schemaInfoCache{
+               cache:  make(map[string]Schema),
+               client: client,
+               topic:  topic,
+       }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) 
{
+       key := hex.EncodeToString(schemaVersion)
+       s.lock.RLock()
+       schema = s.cache[key]
+       s.lock.RUnlock()
+       if schema != nil {
+               return
+       }
+
+       pbSchema, err := s.client.lookupService.GetSchema(s.topic, 
schemaVersion)
+       if err != nil {
+               return nil, err
+       }
+
+       var properties = make(map[string]string)
+       if pbSchema.Properties != nil {
+               for _, entry := range pbSchema.Properties {
+                       properties[*entry.Key] = properties[*entry.Value]
+               }
+       }
+       schema, err = NewSchema(SchemaType(*pbSchema.Type), 
pbSchema.SchemaData, properties)
+       if err != nil {
+               return nil, err
+       }
+       s.add(key, schema)
+       return schema, nil
+
+}
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+       s.lock.Lock()
+       defer s.lock.Unlock()
+
+       s.cache[schemaVersionHash] = schema

Review comment:
       Do we allow schema version overwrite?

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
        dlq         *dlqRouter
 
        log log.Logger
-
+  providersMutex       sync.RWMutex

Review comment:
       Run `go fmt` to fix all formatting issues

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
        dlq         *dlqRouter
 
        log log.Logger
-
+  providersMutex       sync.RWMutex
        compressionProviders sync.Map 
//map[pb.CompressionType]compression.Provider
        metrics              *internal.LeveledMetrics
        decryptor            cryptointernal.Decryptor
+       schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+       lock   sync.RWMutex
+       cache  map[string]Schema
+       client *client
+       topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+       return &schemaInfoCache{
+               cache:  make(map[string]Schema),
+               client: client,
+               topic:  topic,
+       }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) 
{
+       key := hex.EncodeToString(schemaVersion)
+       s.lock.RLock()
+       schema = s.cache[key]
+       s.lock.RUnlock()
+       if schema != nil {
+               return

Review comment:
       Or 
           s.lock.RLock()
        schema, ok = s.cache[key]
        s.lock.RUnlock()
        if ok {
                return schema, nil
            }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to