oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788329960



##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
        dlq         *dlqRouter
 
        log log.Logger
-
+  providersMutex       sync.RWMutex
        compressionProviders sync.Map 
//map[pb.CompressionType]compression.Provider
        metrics              *internal.LeveledMetrics
        decryptor            cryptointernal.Decryptor
+       schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+       lock   sync.RWMutex
+       cache  map[string]Schema
+       client *client
+       topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+       return &schemaInfoCache{
+               cache:  make(map[string]Schema),
+               client: client,
+               topic:  topic,
+       }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) 
{
+       key := hex.EncodeToString(schemaVersion)
+       s.lock.RLock()
+       schema = s.cache[key]
+       s.lock.RUnlock()
+       if schema != nil {
+               return
+       }
+
+       pbSchema, err := s.client.lookupService.GetSchema(s.topic, 
schemaVersion)
+       if err != nil {
+               return nil, err
+       }
+
+       var properties = make(map[string]string)
+       if pbSchema.Properties != nil {
+               for _, entry := range pbSchema.Properties {
+                       properties[*entry.Key] = properties[*entry.Value]
+               }
+       }
+       schema, err = NewSchema(SchemaType(*pbSchema.Type), 
pbSchema.SchemaData, properties)
+       if err != nil {
+               return nil, err
+       }
+       s.add(key, schema)
+       return schema, nil
+
+}
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+       s.lock.Lock()
+       defer s.lock.Unlock()
+
+       s.cache[schemaVersionHash] = schema

Review comment:
       You are right, overwrite is not allowed. It is just a private function 
for `SchemaInfoCache.Get` which do the overwrite check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to