cckellogg commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r702008258
##########
File path: pulsar/consumer_partition.go
##########
@@ -141,6 +142,63 @@ type partitionConsumer struct {
providersMutex sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
metrics *internal.TopicMetrics
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]*Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]*Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) get(key string) (schema *Schema) {
Review comment:
Let's remove the variable names here and below and return values from
the functions. It makes the code more difficult to follow especially below
since some returns have values and others do not.
##########
File path: pulsar/producer_partition.go
##########
@@ -361,19 +407,48 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
msg := request.msg
payload := msg.Payload
+
var schemaPayload []byte
var err error
- if p.options.Schema != nil {
- schemaPayload, err = p.options.Schema.Encode(msg.Value)
+ if msg.Value != nil && msg.Payload != nil {
+ p.log.Error("Can not set Value and Payload both")
+ return
+ }
+
+ if p.options.DisableMultiSchema {
+ if msg.Schema == nil && p.options.Schema == nil &&
!msg.Schema.GetSchemaInfo().isSame((p.options.Schema).GetSchemaInfo()) {
Review comment:
Can this be simplified or a comment added. I'm having a difficult time
understanding.
##########
File path: pulsar/impl_message.go
##########
@@ -262,9 +264,20 @@ func (msg *message) GetReplicatedFrom() string {
}
func (msg *message) GetSchemaValue(v interface{}) error {
+ if msg.schemaVersion != nil {
+ schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+ if err != nil {
+ return err
+ }
+ return (*schema).Decode(msg.payLoad, v)
Review comment:
Why is casting needed here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]