This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new af56e603 refactor: factor out validateMsg (#1117)
af56e603 is described below

commit af56e6031d8021ddcff188f94c4d744eba3b3211
Author: tison <[email protected]>
AuthorDate: Tue Oct 24 21:27:02 2023 +0800

    refactor: factor out validateMsg (#1117)
    
    Signed-off-by: tison <[email protected]>
---
 pulsar/producer_partition.go | 40 +++++++++++++++++++++++++---------------
 1 file changed, 25 insertions(+), 15 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 82123f98..62c180ca 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -495,14 +495,6 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                return
        }
 
-       if p.options.DisableMultiSchema {
-               if msg.Schema != nil && p.options.Schema != nil &&
-                       msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
-                       runCallback(request.callback, nil, request.msg, 
fmt.Errorf("msg schema can not match with producer schema"))
-                       p.log.WithError(err).Errorf("The producer %s of the 
topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
-                       return
-               }
-       }
        var schema Schema
        var schemaVersion []byte
        if msg.Schema != nil {
@@ -1121,17 +1113,35 @@ func (p *partitionProducer) SendAsync(ctx 
context.Context, msg *ProducerMessage,
        p.internalSendAsync(ctx, msg, callback, false)
 }
 
-func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
-       callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
+func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
        if msg == nil {
-               p.log.Error("Message is nil")
-               runCallback(callback, nil, msg, newError(InvalidMessage, 
"Message is nil"))
-               return
+               return newError(InvalidMessage, "Message is nil")
        }
 
        if msg.Value != nil && msg.Payload != nil {
-               p.log.Error("Can not set Value and Payload both")
-               runCallback(callback, nil, msg, newError(InvalidMessage, "Can 
not set Value and Payload both"))
+               return newError(InvalidMessage, "Can not set Value and Payload 
both")
+       }
+
+       if p.options.DisableMultiSchema {
+               if msg.Schema != nil && p.options.Schema != nil &&
+                       msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
+                       p.log.Errorf("The producer %s of the topic %s is 
disabled the `MultiSchema`", p.producerName, p.topic)
+                       return fmt.Errorf("msg schema can not match with 
producer schema")
+               }
+       }
+
+       return nil
+}
+
+func (p *partitionProducer) internalSendAsync(
+       ctx context.Context,
+       msg *ProducerMessage,
+       callback func(MessageID, *ProducerMessage, error),
+       flushImmediately bool,
+) {
+       if err := p.validateMsg(msg); err != nil {
+               p.log.Error(err)
+               runCallback(callback, nil, msg, err)
                return
        }
 

Reply via email to