This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0412f28  fix: fix the internalSend() return without 
sendRequest.callback() (#880)
0412f28 is described below

commit 0412f28624f3679501f41f831cdcc196a3d6a2e4
Author: Jiaqi Shen <[email protected]>
AuthorDate: Wed Nov 2 18:47:46 2022 +0800

    fix: fix the internalSend() return without sendRequest.callback() (#880)
---
 pulsar/producer_partition.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 881451c..fc564cb 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -480,6 +480,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        var err error
        if msg.Value != nil && msg.Payload != nil {
                p.log.Error("Can not set Value and Payload both")
+               request.callback(nil, request.msg, errors.New("can not set 
Value and Payload both"))
                return
        }
 
@@ -493,6 +494,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                if msg.Schema != nil && p.options.Schema != nil &&
                        msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
                        p.publishSemaphore.Release()
+                       request.callback(nil, request.msg, fmt.Errorf("msg 
schema can not match with producer schema"))
                        p.log.WithError(err).Errorf("The producer %s of the 
topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
                        return
                }
@@ -528,6 +530,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                        if err != nil {
                                p.publishSemaphore.Release()
                                p.log.WithError(err).Error("get schema version 
fail")
+                               request.callback(nil, request.msg, 
fmt.Errorf("get schema version fail, err: %w", err))
                                return
                        }
                        p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)

Reply via email to