zzzming commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r787775362
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map
//map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error)
{
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
Review comment:
return schema nil
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map
//map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error)
{
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
+ }
+
+ pbSchema, err := s.client.lookupService.GetSchema(s.topic,
schemaVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ var properties = make(map[string]string)
+ if pbSchema.Properties != nil {
+ for _, entry := range pbSchema.Properties {
+ properties[*entry.Key] = properties[*entry.Value]
Review comment:
since Key and Value are both declared as *string, do you need check the
reference is not nil before assigning them to properties?
##########
File path: pulsar/producer_partition.go
##########
@@ -397,19 +460,48 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
msg := request.msg
payload := msg.Payload
+
var schemaPayload []byte
var err error
- if p.options.Schema != nil {
- schemaPayload, err = p.options.Schema.Encode(msg.Value)
+ if msg.Value != nil && msg.Payload != nil {
+ p.log.Error("Can not set Value and Payload both")
+ return
+ }
+
+ if p.options.DisableMultiSchema {
+ if msg.Schema != p.options.Schema {
Review comment:
I am not sure if we want to compare the pointer or the actual value.
Shema is an interface. So the current evaluation just make sure they are
pointing to the same reference. Do you want to compare the value of Schema?
##########
File path: pulsar/internal/lookup_service.go
##########
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace
string, mode GetTopic
return topics, nil
}
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte)
(schema *pb.Schema, err error) {
+ return nil, errors.New("not support")
Review comment:
Can we make this more descriptive? Maybe an error like this helps to
debug.
errors.New("GetSchema is supported by httpLookupService")
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map
//map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error)
{
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
+ }
+
+ pbSchema, err := s.client.lookupService.GetSchema(s.topic,
schemaVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ var properties = make(map[string]string)
+ if pbSchema.Properties != nil {
+ for _, entry := range pbSchema.Properties {
+ properties[*entry.Key] = properties[*entry.Value]
+ }
+ }
+ schema, err = NewSchema(SchemaType(*pbSchema.Type),
pbSchema.SchemaData, properties)
+ if err != nil {
+ return nil, err
+ }
+ s.add(key, schema)
+ return schema, nil
+
+}
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.cache[schemaVersionHash] = schema
Review comment:
Do we allow schema version overwrite?
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
Review comment:
Run `go fmt` to fix all formatting issues
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map
//map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error)
{
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
Review comment:
Or
s.lock.RLock()
schema, ok = s.cache[key]
s.lock.RUnlock()
if ok {
return schema, nil
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]