This is an automated email from the ASF dual-hosted git repository.
cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c8d06a6 [ISSUE #996] invoke user callback and return error info when
async process send resp error (#997)
c8d06a6 is described below
commit c8d06a661a022097445cc7979290733a2cc86804
Author: wangfan <[email protected]>
AuthorDate: Mon Feb 13 14:07:43 2023 +0800
[ISSUE #996] invoke user callback and return error info when async process
send resp error (#997)
---
internal/client.go | 3 +--
producer/producer.go | 11 ++++++++---
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/internal/client.go b/internal/client.go
index 408e942..d3c65a9 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -702,8 +702,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string,
cmd *remote.RemotingC
case ResSuccess:
status = primitive.SendOK
default:
- status = primitive.SendUnknownError
- return errors.New(cmd.Remark)
+ return errors.New(fmt.Sprintf("CODE: %d, DESC: %s", cmd.Code,
cmd.Remark))
}
msgIDs := make([]string, 0)
diff --git a/producer/producer.go b/producer/producer.go
index fa2a832..c8415e9 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -373,13 +373,18 @@ func (p *defaultProducer) sendAsync(ctx context.Context,
msg *primitive.Message,
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg),
func(command *remote.RemotingCommand, err error) {
cancel()
+ if err != nil {
+ h(ctx, nil, err)
+ }
+
resp := primitive.NewSendResult()
+ err = p.client.ProcessSendResponse(mq.BrokerName, command,
resp, msg)
if err != nil {
h(ctx, nil, err)
- } else {
- p.client.ProcessSendResponse(mq.BrokerName, command,
resp, msg)
- h(ctx, resp, nil)
+ return
}
+
+ h(ctx, resp, nil)
})
if err != nil {