This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ea6eccf  feat: support multiple schema version for producer and 
consumer (#611)
ea6eccf is described below

commit ea6eccf7ddad548a1ad977a0b72a61b894e6d248
Author: blueness <[email protected]>
AuthorDate: Thu Jul 7 09:55:03 2022 +0800

    feat: support multiple schema version for producer and consumer (#611)
    
    * feat: support multiple schema version for producer and consumer
    
    * Fix format the code and add some description for error
    
    * fix: syntax error
    
    * fix lint and CI error
---
 pulsar/consumer_partition.go                       |  59 ++++++-
 pulsar/impl_message.go                             |  13 ++
 pulsar/internal/batch_builder.go                   |  17 ++
 pulsar/internal/commands.go                        |   4 +
 pulsar/internal/connection.go                      |   3 +
 pulsar/internal/key_based_batch_builder.go         |   7 +-
 pulsar/internal/lookup_service.go                  |  23 +++
 .../pulsartracing/message_carrier_util_test.go     |   3 +
 pulsar/message.go                                  |   7 +
 pulsar/negative_acks_tracker_test.go               |   8 +
 pulsar/producer.go                                 |   4 +
 pulsar/producer_partition.go                       | 131 ++++++++++++--
 pulsar/producer_test.go                            | 188 +++++++++++++++++++++
 pulsar/schema.go                                   |  40 +++++
 14 files changed, 485 insertions(+), 22 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 4cc1645..fac9d4b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+       "encoding/hex"
        "errors"
        "fmt"
        "math"
@@ -144,11 +145,57 @@ type partitionConsumer struct {
        nackTracker *negativeAcksTracker
        dlq         *dlqRouter
 
-       log log.Logger
-
+       log                  log.Logger
        compressionProviders sync.Map 
//map[pb.CompressionType]compression.Provider
        metrics              *internal.LeveledMetrics
        decryptor            cryptointernal.Decryptor
+       schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+       lock   sync.RWMutex
+       cache  map[string]Schema
+       client *client
+       topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+       return &schemaInfoCache{
+               cache:  make(map[string]Schema),
+               client: client,
+               topic:  topic,
+       }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) 
{
+       key := hex.EncodeToString(schemaVersion)
+       s.lock.RLock()
+       schema, ok := s.cache[key]
+       s.lock.RUnlock()
+       if ok {
+               return schema, nil
+       }
+
+       pbSchema, err := s.client.lookupService.GetSchema(s.topic, 
schemaVersion)
+       if err != nil {
+               return nil, err
+       }
+
+       var properties = internal.ConvertToStringMap(pbSchema.Properties)
+
+       schema, err = NewSchema(SchemaType(*pbSchema.Type), 
pbSchema.SchemaData, properties)
+       if err != nil {
+               return nil, err
+       }
+       s.add(key, schema)
+       return schema, nil
+}
+
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+       s.lock.Lock()
+       defer s.lock.Unlock()
+
+       s.cache[schemaVersionHash] = schema
 }
 
 func newPartitionConsumer(parent Consumer, client *client, options 
*partitionConsumerOpts,
@@ -175,6 +222,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                compressionProviders: sync.Map{},
                dlq:                  dlq,
                metrics:              metrics,
+               schemaInfoCache:      newSchemaInfoCache(client, options.topic),
        }
        pc.setConsumerState(consumerInit)
        pc.log = client.log.SubLogger(log.Fields{
@@ -687,6 +735,8 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                replicationClusters: msgMeta.GetReplicateTo(),
                                replicatedFrom:      
msgMeta.GetReplicatedFrom(),
                                redeliveryCount:     
response.GetRedeliveryCount(),
+                               schemaVersion:       msgMeta.GetSchemaVersion(),
+                               schemaInfoCache:     pc.schemaInfoCache,
                                orderingKey:         string(smm.OrderingKey),
                                index:               messageIndex,
                                brokerPublishTime:   brokerPublishTime,
@@ -705,6 +755,8 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                replicationClusters: msgMeta.GetReplicateTo(),
                                replicatedFrom:      
msgMeta.GetReplicatedFrom(),
                                redeliveryCount:     
response.GetRedeliveryCount(),
+                               schemaVersion:       msgMeta.GetSchemaVersion(),
+                               schemaInfoCache:     pc.schemaInfoCache,
                                index:               messageIndex,
                                brokerPublishTime:   brokerPublishTime,
                        }
@@ -1119,7 +1171,7 @@ func (pc *partitionConsumer) grabConn() error {
        keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
        requestID := pc.client.rpcClient.NewRequestID()
 
-       pbSchema := new(pb.Schema)
+       var pbSchema *pb.Schema
 
        if pc.options.schema != nil && pc.options.schema.GetSchemaInfo() != nil 
{
                tmpSchemaType := 
pb.Schema_Type(int32(pc.options.schema.GetSchemaInfo().Type))
@@ -1131,7 +1183,6 @@ func (pc *partitionConsumer) grabConn() error {
                }
                pc.log.Debugf("The partition consumer schema name is: %s", 
pbSchema.Name)
        } else {
-               pbSchema = nil
                pc.log.Debug("The partition consumer schema is nil")
        }
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 8248b1a..067439f 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -243,6 +243,8 @@ type message struct {
        replicatedFrom      string
        redeliveryCount     uint32
        schema              Schema
+       schemaVersion       []byte
+       schemaInfoCache     *schemaInfoCache
        encryptionContext   *EncryptionContext
        index               *uint64
        brokerPublishTime   *time.Time
@@ -293,9 +295,20 @@ func (msg *message) GetReplicatedFrom() string {
 }
 
 func (msg *message) GetSchemaValue(v interface{}) error {
+       if msg.schemaVersion != nil {
+               schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+               if err != nil {
+                       return err
+               }
+               return schema.Decode(msg.payLoad, v)
+       }
        return msg.schema.Decode(msg.payLoad, v)
 }
 
+func (msg *message) SchemaVersion() []byte {
+       return msg.schemaVersion
+}
+
 func (msg *message) ProducerName() string {
        return msg.producerName
 }
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index fb7598e..fe8f628 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+       "bytes"
        "time"
 
        "github.com/gogo/protobuf/proto"
@@ -49,6 +50,7 @@ type BatchBuilder interface {
                metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
                payload []byte,
                callback interface{}, replicateTo []string, deliverAt time.Time,
+               schemaVersion []byte, multiSchemaEnabled bool,
        ) bool
 
        // Flush all the messages buffered in the client and wait until all 
messages have been successfully persisted.
@@ -165,12 +167,21 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
        return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
+func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool {
+       if bc.numMessages == 0 {
+               return true
+       }
+       return bytes.Equal(bc.msgMetadata.SchemaVersion, schemaVersion)
+}
+
 // Add will add single message to batch.
 func (bc *batchContainer) Add(
        metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
        payload []byte,
        callback interface{}, replicateTo []string, deliverAt time.Time,
+       schemaVersion []byte, multiSchemaEnabled bool,
 ) bool {
+
        if replicateTo != nil && bc.numMessages != 0 {
                // If the current batch is not empty and we're trying to set 
the replication clusters,
                // then we need to force the current batch to flush and send 
the message individually
@@ -182,6 +193,9 @@ func (bc *batchContainer) Add(
        } else if !bc.hasSpace(payload) {
                // The current batch is full. Producer has to call Flush() to
                return false
+       } else if multiSchemaEnabled && !bc.hasSameSchema(schemaVersion) {
+               // The current batch has a different schema. Producer has to 
call Flush() to
+               return false
        }
 
        if bc.numMessages == 0 {
@@ -196,6 +210,7 @@ func (bc *batchContainer) Add(
                bc.msgMetadata.ProducerName = &bc.producerName
                bc.msgMetadata.ReplicateTo = replicateTo
                bc.msgMetadata.PartitionKey = metadata.PartitionKey
+               bc.msgMetadata.SchemaVersion = schemaVersion
                bc.msgMetadata.Properties = metadata.Properties
 
                if deliverAt.UnixNano() > 0 {
@@ -217,6 +232,7 @@ func (bc *batchContainer) reset() {
        bc.callbacks = []interface{}{}
        bc.msgMetadata.ReplicateTo = nil
        bc.msgMetadata.DeliverAtTime = nil
+       bc.msgMetadata.SchemaVersion = nil
        bc.msgMetadata.Properties = nil
 }
 
@@ -228,6 +244,7 @@ func (bc *batchContainer) Flush() (
                // No-Op for empty batch
                return nil, 0, nil, nil
        }
+
        bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages)
 
        bc.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bc.numMessages))
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7fd1885..1af837e 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -213,6 +213,10 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg 
proto.Message) *pb.BaseCommand
                cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
        case pb.BaseCommand_AUTH_RESPONSE:
                cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
+       case pb.BaseCommand_GET_OR_CREATE_SCHEMA:
+               cmd.GetOrCreateSchema = msg.(*pb.CommandGetOrCreateSchema)
+       case pb.BaseCommand_GET_SCHEMA:
+               cmd.GetSchema = msg.(*pb.CommandGetSchema)
        default:
                panic(fmt.Sprintf("Missing command type: %v", cmdType))
        }
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 318ec89..48dfd8f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -542,6 +542,9 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
        case pb.BaseCommand_GET_SCHEMA_RESPONSE:
                c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
 
+       case pb.BaseCommand_GET_OR_CREATE_SCHEMA_RESPONSE:
+               c.handleResponse(cmd.GetOrCreateSchemaResponse.GetRequestId(), 
cmd)
+
        case pb.BaseCommand_ERROR:
                c.handleResponseError(cmd.GetError())
 
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index 667e855..77fbb8c 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -131,6 +131,7 @@ func (bc *keyBasedBatchContainer) Add(
        metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
        payload []byte,
        callback interface{}, replicateTo []string, deliverAt time.Time,
+       schemaVersion []byte, multiSchemaEnabled bool,
 ) bool {
        if replicateTo != nil && bc.numMessages != 0 {
                // If the current batch is not empty and we're trying to set 
the replication clusters,
@@ -158,10 +159,14 @@ func (bc *keyBasedBatchContainer) Add(
        }
 
        // add message to batch container
-       batchPart.Add(
+       add := batchPart.Add(
                metadata, sequenceIDGenerator, payload, callback, replicateTo,
                deliverAt,
+               schemaVersion, multiSchemaEnabled,
        )
+       if !add {
+               return false
+       }
        addSingleMessageToBatch(bc.buffer, metadata, payload)
 
        bc.numMessages++
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index e4dac1a..24d73db 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -61,6 +61,9 @@ type LookupService interface {
        // GetTopicsOfNamespace returns all the topics name for a given 
namespace.
        GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) 
([]string, error)
 
+       // GetSchema returns schema for a given version.
+       GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err 
error)
+
        // Closable Allow Lookup Service's internal client to be able to closed
        Closable
 }
@@ -87,6 +90,23 @@ func NewLookupService(rpcClient RPCClient, serviceURL 
*url.URL, serviceNameResol
        }
 }
 
+func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema 
*pb.Schema, err error) {
+       id := ls.rpcClient.NewRequestID()
+       req := &pb.CommandGetSchema{
+               RequestId:     proto.Uint64(id),
+               Topic:         proto.String(topic),
+               SchemaVersion: schemaVersion,
+       }
+       res, err := ls.rpcClient.RequestToAnyBroker(id, 
pb.BaseCommand_GET_SCHEMA, req)
+       if err != nil {
+               return nil, err
+       }
+       if res.Response.Error != nil {
+               return nil, errors.New(res.Response.GetError().String())
+       }
+       return res.Response.GetSchemaResponse.Schema, nil
+}
+
 func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) 
(logicalAddress *url.URL,
        physicalAddress *url.URL, err error) {
        if ls.tlsEnabled {
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace 
string, mode GetTopic
        return topics, nil
 }
 
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) 
(schema *pb.Schema, err error) {
+       return nil, errors.New("GetSchema is not supported by 
httpLookupService")
+}
 func (h *httpLookupService) Close() {
        h.httpClient.Close()
 }
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go 
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 7f25578..df78ae2 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -120,6 +120,9 @@ func (msg *mockConsumerMessage) ProducerName() string {
        return ""
 }
 
+func (msg *mockConsumerMessage) SchemaVersion() []byte {
+       return nil
+}
 func (msg *mockConsumerMessage) GetEncryptionContext() 
*pulsar.EncryptionContext {
        return &pulsar.EncryptionContext{}
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index b88f158..c117c99 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -65,6 +65,10 @@ type ProducerMessage struct {
        //     through a `SubscriptionType=Shared` subscription. With other 
subscription
        //     types, the messages will still be delivered immediately.
        DeliverAt time.Time
+
+       //Schema assign to the current message
+       //Note: messages may have a different schema from producer schema, use 
it instead of producer schema when assigned
+       Schema Schema
 }
 
 // Message abstraction used in Pulsar
@@ -118,6 +122,9 @@ type Message interface {
        // GetSchemaValue returns the de-serialized value of the message, 
according to the configuration.
        GetSchemaValue(v interface{}) error
 
+       //SchemaVersion get the schema version of the message, if any
+       SchemaVersion() []byte
+
        // GetEncryptionContext returns the ecryption context of the message.
        // It will be used by the application to parse the undecrypted message.
        GetEncryptionContext() *EncryptionContext
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index e47fb09..537f0da 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -233,6 +233,10 @@ func (msg *mockMessage1) ProducerName() string {
        return ""
 }
 
+func (msg *mockMessage1) SchemaVersion() []byte {
+       return nil
+}
+
 func (msg *mockMessage1) GetEncryptionContext() *EncryptionContext {
        return &EncryptionContext{}
 }
@@ -301,6 +305,10 @@ func (msg *mockMessage2) GetSchemaValue(v interface{}) 
error {
        return nil
 }
 
+func (msg *mockMessage2) SchemaVersion() []byte {
+       return nil
+}
+
 func (msg *mockMessage2) ProducerName() string {
        return ""
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index d9b2307..fd68631 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -166,6 +166,10 @@ type ProducerOptions struct {
        // Default is 1 minute
        PartitionsAutoDiscoveryInterval time.Duration
 
+       // Disable multiple Schame Version
+       // Default false
+       DisableMultiSchema bool
+
        // Encryption specifies the fields required to encrypt a message
        Encryption *ProducerEncryptionInfo
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 5a2694c..ec92415 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "strings"
        "sync"
        "sync/atomic"
@@ -86,10 +87,36 @@ type partitionProducer struct {
        schemaInfo       *SchemaInfo
        partitionIdx     int32
        metrics          *internal.LeveledMetrics
+       epoch            uint64
+       schemaCache      *schemaCache
+}
+
+type schemaCache struct {
+       lock    sync.RWMutex
+       schemas map[string][]byte
+}
 
-       epoch uint64
+func newSchemaCache() *schemaCache {
+       return &schemaCache{
+               schemas: make(map[string][]byte),
+       }
 }
 
+func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) {
+       s.lock.Lock()
+       defer s.lock.Unlock()
+
+       key := schema.hash()
+       s.schemas[key] = schemaVersion
+}
+
+func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) {
+       s.lock.RLock()
+       defer s.lock.RUnlock()
+
+       key := schema.hash()
+       return s.schemas[key]
+}
 func newPartitionProducer(client *client, topic string, options 
*ProducerOptions, partitionIdx int,
        metrics *internal.LeveledMetrics) (
        *partitionProducer, error) {
@@ -125,6 +152,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                partitionIdx:     int32(partitionIdx),
                metrics:          metrics,
                epoch:            0,
+               schemaCache:      newSchemaCache(),
        }
        if p.options.DisableBatching {
                p.batchFlushTicker.Stop()
@@ -179,7 +207,7 @@ func (p *partitionProducer) grabCnx() error {
 
        // set schema info for producer
 
-       pbSchema := new(pb.Schema)
+       var pbSchema *pb.Schema
        if p.schemaInfo != nil {
                tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
                pbSchema = &pb.Schema{
@@ -188,10 +216,9 @@ func (p *partitionProducer) grabCnx() error {
                        SchemaData: []byte(p.schemaInfo.Schema),
                        Properties: 
internal.ConvertFromStringMap(p.schemaInfo.Properties),
                }
-               p.log.Debugf("The partition consumer schema name is: %s", 
pbSchema.Name)
+               p.log.Debugf("The partition producer schema name is: %s", 
pbSchema.Name)
        } else {
-               pbSchema = nil
-               p.log.Debug("The partition consumer schema is nil")
+               p.log.Debug("The partition producer schema is nil")
        }
 
        cmdProducer := &pb.CommandProducer{
@@ -261,6 +288,12 @@ func (p *partitionProducer) grabCnx() error {
                nextSequenceID := 
uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
                p.sequenceIDGenerator = &nextSequenceID
        }
+
+       schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion()
+       if len(schemaVersion) != 0 {
+               p.schemaCache.Put(p.schemaInfo, schemaVersion)
+       }
+
        p._setConn(res.Cnx)
        err = p._getConn().RegisterListener(p.producerID, p)
        if err != nil {
@@ -316,6 +349,36 @@ func (p *partitionProducer) ConnectionClosed() {
        p.connectClosedCh <- connectionClosed{}
 }
 
+func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) 
(schemaVersion []byte, err error) {
+
+       tmpSchemaType := pb.Schema_Type(int32(schemaInfo.Type))
+       pbSchema := &pb.Schema{
+               Name:       proto.String(schemaInfo.Name),
+               Type:       &tmpSchemaType,
+               SchemaData: []byte(schemaInfo.Schema),
+               Properties: 
internal.ConvertFromStringMap(schemaInfo.Properties),
+       }
+       id := p.client.rpcClient.NewRequestID()
+       req := &pb.CommandGetOrCreateSchema{
+               RequestId: proto.Uint64(id),
+               Topic:     proto.String(p.topic),
+               Schema:    pbSchema,
+       }
+       res, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, 
pb.BaseCommand_GET_OR_CREATE_SCHEMA, req)
+       if err != nil {
+               return
+       }
+       if res.Response.Error != nil {
+               err = errors.New(res.Response.GetError().String())
+               return
+       }
+       if res.Response.GetOrCreateSchemaResponse.ErrorCode != nil {
+               err = 
errors.New(*res.Response.GetOrCreateSchemaResponse.ErrorMessage)
+               return
+       }
+       return res.Response.GetOrCreateSchemaResponse.SchemaVersion, nil
+}
+
 func (p *partitionProducer) reconnectToBroker() {
        var (
                maxRetry int
@@ -407,21 +470,54 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        // read payload from message
        payload := msg.Payload
 
+       var schemaPayload []byte
        var err error
+       if msg.Value != nil && msg.Payload != nil {
+               p.log.Error("Can not set Value and Payload both")
+               return
+       }
 
-       // payload and schema are mutually exclusive
-       // try to get payload from schema value only if payload is not set
-       if payload == nil && p.options.Schema != nil {
-               var schemaPayload []byte
-               schemaPayload, err = p.options.Schema.Encode(msg.Value)
-               if err != nil {
-                       p.publishSemaphore.Release()
-                       request.callback(nil, request.msg, 
newError(SchemaFailure, err.Error()))
-                       p.log.WithError(err).Errorf("Schema encode message 
failed %s", msg.Value)
+       if p.options.DisableMultiSchema {
+               if msg.Schema != nil && p.options.Schema != nil &&
+                       msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
+                       p.log.WithError(err).Errorf("The producer %s of the 
topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
                        return
                }
+       }
+       var schema Schema
+       var schemaVersion []byte
+       if msg.Schema != nil {
+               schema = msg.Schema
+       } else if p.options.Schema != nil {
+               schema = p.options.Schema
+       }
+       if msg.Value != nil {
+               // payload and schema are mutually exclusive
+               // try to get payload from schema value only if payload is not 
set
+               if payload == nil && schema != nil {
+                       schemaPayload, err = schema.Encode(msg.Value)
+                       if err != nil {
+                               p.publishSemaphore.Release()
+                               request.callback(nil, request.msg, 
newError(SchemaFailure, err.Error()))
+                               p.log.WithError(err).Errorf("Schema encode 
message failed %s", msg.Value)
+                               return
+                       }
+               }
+       }
+       if payload == nil {
                payload = schemaPayload
        }
+       if schema != nil {
+               schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+               if schemaVersion == nil {
+                       schemaVersion, err = 
p.getOrCreateSchema(schema.GetSchemaInfo())
+                       if err != nil {
+                               p.log.WithError(err).Error("get schema version 
fail")
+                               return
+                       }
+                       p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+               }
+       }
 
        // if msg is too large
        if len(payload) > int(p._getConn().GetMaxMessageSize()) {
@@ -476,9 +572,9 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        if msg.DisableReplication {
                msg.ReplicationClusters = []string{"__local__"}
        }
-
+       multiSchemaEnabled := !p.options.DisableMultiSchema
        added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, 
request,
-               msg.ReplicationClusters, deliverAt)
+               msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled)
        if !added {
                // The current batch is full.. flush it and retry
 
@@ -486,7 +582,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
                // after flushing try again to add the current payload
                if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
payload, request,
-                       msg.ReplicationClusters, deliverAt); !ok {
+                       msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled); !ok {
                        p.publishSemaphore.Release()
                        request.callback(nil, request.msg, errFailAddToBatch)
                        p.log.WithField("size", len(payload)).
@@ -501,6 +597,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                p.internalFlushCurrentBatch()
 
        }
+
 }
 
 type pendingItem struct {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 6b2b5d9..dc13f50 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1380,3 +1380,191 @@ func TestExactlyOnceWithProducerNameSpecified(t 
*testing.T) {
        assert.NotNil(t, err)
        assert.Nil(t, producer3)
 }
+
+func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
+       const MsgBatchCount = 10
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       schema1 := NewAvroSchema(`{"fields":
+               [       
+                       {"name":"id","type":"int"},
+                       {"default":null,"name":"name","type":["null","string"]}
+               ],
+               
"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+       schema2 := NewAvroSchema(`{"fields":
+               [
+                               
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]},
+                          {"default":null,"name":"age","type":["null","int"]}
+               ],
+               
"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+       v1 := map[string]interface{}{
+               "id": 1,
+               "name": map[string]interface{}{
+                       "string": "aac",
+               },
+       }
+       v2 := map[string]interface{}{
+               "id": 1,
+               "name": map[string]interface{}{
+                       "string": "test",
+               },
+               "age": map[string]interface{}{
+                       "int": 10,
+               },
+       }
+       topic := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:              topic,
+               Schema:             schema1,
+               BatcherBuilderType: KeyBasedBatchBuilder,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       keys := []string{"key1", "key2", "key3"}
+
+       for i := 0; i < MsgBatchCount; i++ {
+               var messageContent []byte
+               var schema Schema
+               for _, key := range keys {
+                       if i%2 == 0 {
+                               messageContent, err = schema1.Encode(v1)
+                               schema = schema1
+                               assert.NoError(t, err)
+                       } else {
+                               messageContent, err = schema2.Encode(v2)
+                               schema = schema2
+                               assert.NoError(t, err)
+                       }
+                       producer.SendAsync(context.Background(), 
&ProducerMessage{
+                               Payload: messageContent,
+                               Key:     key,
+                               Schema:  schema,
+                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                               assert.NoError(t, err)
+                               assert.NotNil(t, id)
+                       })
+               }
+
+       }
+       producer.Flush()
+
+       //// create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               SubscriptionName:            "my-sub2",
+               Type:                        Failover,
+               Schema:                      schema1,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       for i := 0; i < MsgBatchCount*len(keys); i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       t.Fatal(err)
+               }
+               var v interface{}
+               err = msg.GetSchemaValue(&v)
+               t.Logf(`schemaVersion: %x recevice %s:%v`, msg.SchemaVersion(), 
msg.Key(), v)
+               assert.Nil(t, err)
+       }
+}
+
+func TestMultipleSchemaProducerConsumer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       schema1 := NewAvroSchema(`{"fields":
+               [
+                       
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+               ],
+               
"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+       schema2 := NewAvroSchema(`{"fields":
+               [
+                       
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]},
+                       {"default":null,"name":"age","type":["null","int"]}
+               
],"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
+       v1 := map[string]interface{}{
+               "id": 1,
+               "name": map[string]interface{}{
+                       "string": "aac",
+               },
+       }
+       v2 := map[string]interface{}{
+               "id": 1,
+               "name": map[string]interface{}{
+                       "string": "test",
+               },
+               "age": map[string]interface{}{
+                       "int": 10,
+               },
+       }
+       topic := newTopicName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  topic,
+               Schema: schema1,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       for i := 0; i < 10; i++ {
+               var messageContent []byte
+               var key string
+               var schema Schema
+               if i%2 == 0 {
+                       messageContent, err = schema1.Encode(v1)
+                       key = "v1"
+                       schema = schema1
+                       assert.NoError(t, err)
+               } else {
+                       messageContent, err = schema2.Encode(v2)
+                       key = "v2"
+                       schema = schema2
+                       assert.NoError(t, err)
+               }
+               producer.SendAsync(context.Background(), &ProducerMessage{
+                       Payload: messageContent,
+                       Key:     key,
+                       Schema:  schema,
+               }, func(id MessageID, producerMessage *ProducerMessage, err 
error) {
+                       assert.NoError(t, err)
+                       assert.NotNil(t, id)
+               })
+       }
+       producer.Flush()
+
+       //// create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               SubscriptionName:            "my-sub2",
+               Type:                        Failover,
+               Schema:                      schema1,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       t.Fatal(err)
+               }
+               var v interface{}
+               err = msg.GetSchemaValue(&v)
+               t.Logf(`schemaVersion: %x recevice %s:%v`, msg.SchemaVersion(), 
msg.Key(), v)
+               assert.Nil(t, err)
+       }
+}
diff --git a/pulsar/schema.go b/pulsar/schema.go
index 42499ad..cd6656f 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -19,7 +19,10 @@ package pulsar
 
 import (
        "bytes"
+       "crypto/sha256"
+       "encoding/hex"
        "encoding/json"
+       "fmt"
        "reflect"
        "unsafe"
 
@@ -62,6 +65,12 @@ type SchemaInfo struct {
        Properties map[string]string
 }
 
+func (s SchemaInfo) hash() string {
+       h := sha256.New()
+       h.Write([]byte(s.Schema))
+       return hex.EncodeToString(h.Sum(nil))
+}
+
 type Schema interface {
        Encode(v interface{}) ([]byte, error)
        Decode(data []byte, v interface{}) error
@@ -69,6 +78,37 @@ type Schema interface {
        GetSchemaInfo() *SchemaInfo
 }
 
+func NewSchema(schemaType SchemaType, schemaData []byte, properties 
map[string]string) (schema Schema, err error) {
+       var schemaDef = string(schemaData)
+       var s Schema
+       switch schemaType {
+       case STRING:
+               s = NewStringSchema(properties)
+       case JSON:
+               s = NewJSONSchema(schemaDef, properties)
+       case PROTOBUF:
+               s = NewProtoSchema(schemaDef, properties)
+       case AVRO:
+               s = NewAvroSchema(schemaDef, properties)
+       case INT8:
+               s = NewInt8Schema(properties)
+       case INT16:
+               s = NewInt16Schema(properties)
+       case INT32:
+               s = NewInt32Schema(properties)
+       case INT64:
+               s = NewInt64Schema(properties)
+       case FLOAT:
+               s = NewFloatSchema(properties)
+       case DOUBLE:
+               s = NewDoubleSchema(properties)
+       default:
+               err = fmt.Errorf("not support schema type of %v", schemaType)
+       }
+       schema = s
+       return
+}
+
 type AvroCodec struct {
        Codec *goavro.Codec
 }

Reply via email to