This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 973c9580 golang: optimize the logic of obtaining routing tasks (#580)
973c9580 is described below
commit 973c9580d4ed1b4cf31f9f7df1b290d22b50c6a0
Author: guyinyou <[email protected]>
AuthorDate: Tue Sep 5 10:56:49 2023 +0800
golang: optimize the logic of obtaining routing tasks (#580)
* golang: optimize the logic of obtaining routing tasks
* add log when queryRoute return nil
---------
Co-authored-by: guyinyou <[email protected]>
---
golang/client.go | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/golang/client.go b/golang/client.go
index fbd3ef8a..0a7defaa 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -363,6 +363,7 @@ func (cli *defaultClient) queryRoute(ctx context.Context,
topic string, duration
}
if len(response.GetMessageQueues()) == 0 {
+ cli.log.Errorf("queryRoute result has no messageQueue,
requestId=%s", utils.GetRequestID(ctx))
return nil, errors.New("rocketmq: no available brokers")
}
return response.GetMessageQueues(), nil
@@ -499,7 +500,14 @@ func (cli *defaultClient) startUp() error {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
oldRoute := v
- newRoute, _ := cli.queryRoute(context.TODO(), topic,
cli.opts.timeout)
+ newRoute, err := cli.queryRoute(context.TODO(), topic,
cli.opts.timeout)
+ if err != nil {
+ cli.log.Errorf("scheduled queryRoute err=%v",
err)
+ }
+ if newRoute == nil && oldRoute != nil {
+ cli.log.Info("newRoute is nil, but oldRoute is
not. do not update")
+ return true
+ }
if !reflect.DeepEqual(newRoute, oldRoute) {
cli.router.Store(k, newRoute)
switch impl := cli.clientImpl.(type) {