This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7ffb599 [ISSUE #1112] feat: optimize producer send async (#1111)
7ffb599 is described below
commit 7ffb599169034023d9f14e38c7599dc62be1f140
Author: WeizhongTu <[email protected]>
AuthorDate: Mon Dec 4 11:12:31 2023 +0800
[ISSUE #1112] feat: optimize producer send async (#1111)
* feat: optimize producer send async
* fix: fix mq override bug
---
internal/remote/remote_client.go | 25 +++++++++++++------------
producer/producer.go | 2 +-
2 files changed, 14 insertions(+), 13 deletions(-)
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 36fbea7..c0ef6ce 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -112,23 +112,24 @@ func (c *remotingClient) InvokeSync(ctx context.Context,
addr string, request *R
// InvokeAsync send request without blocking, just return immediately.
func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request
*RemotingCommand, callback func(*ResponseFuture)) error {
- conn, err := c.connect(ctx, addr)
- if err != nil {
- return err
- }
-
resp := NewResponseFuture(ctx, request.Opaque, callback)
c.responseTable.Store(resp.Opaque, resp)
- err = c.sendRequest(ctx, conn, request)
- if err != nil {
- c.responseTable.Delete(request.Opaque)
- return err
- }
-
go primitive.WithRecover(func() {
+ defer resp.executeInvokeCallback()
+ defer c.responseTable.Delete(request.Opaque)
+
+ conn, err := c.connect(ctx, addr)
+ if err != nil {
+ resp.Err = err
+ return
+ }
+ err = c.sendRequest(ctx, conn, request)
+ if err != nil {
+ resp.Err = err
+ return
+ }
c.receiveAsync(resp)
- c.responseTable.Delete(request.Opaque)
})
return nil
diff --git a/producer/producer.go b/producer/producer.go
index 70e8d01..eb3cd2e 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -329,7 +329,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg
*primitive.Message,
if mq != nil {
lastBrokerName = mq.BrokerName
}
- mq := p.selectMessageQueue(msg, lastBrokerName)
+ mq = p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found",
msg.Topic)
continue