This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d43828 fix telemeter logic (#348)
70d43828 is described below

commit 70d438283aa7d87cc38fdb1de64842761d700858
Author: guyinyou <[email protected]>
AuthorDate: Fri Feb 3 17:09:33 2023 +0800

    fix telemeter logic (#348)
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/client.go | 58 +++++++++++++++++++++-----------------------------------
 1 file changed, 22 insertions(+), 36 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index 41a95796..2b95f2bc 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -264,6 +264,22 @@ func (cli *defaultClient) getMessageQueues(ctx 
context.Context, topic string) ([
        if err != nil {
                return nil, err
        }
+
+       // telemeter to all messageQueues
+       endpointsSet := make(map[string]bool)
+       for _, messageQueue := range route {
+               for _, address := range 
messageQueue.GetBroker().GetEndpoints().GetAddresses() {
+                       target := utils.ParseAddress(address)
+                       if _, ok := endpointsSet[target]; ok {
+                               continue
+                       }
+                       endpointsSet[target] = true
+                       if err = cli.mustSyncSettingsToTargert(target); err != 
nil {
+                               return nil, err
+                       }
+               }
+       }
+
        cli.router.Store(topic, route)
        return route, nil
 }
@@ -299,14 +315,7 @@ func (cli *defaultClient) getQueryRouteRequest(topic 
string) *v2.QueryRouteReque
 func (cli *defaultClient) getTotalTargets() []string {
        endpoints := make([]string, 0)
        endpointsSet := make(map[string]bool)
-       for _, address := range cli.accessPoint.GetAddresses() {
-               target := utils.ParseAddress(address)
-               if _, ok := endpointsSet[target]; ok {
-                       continue
-               }
-               endpointsSet[target] = true
-               endpoints = append(endpoints, target)
-       }
+
        cli.router.Range(func(_, v interface{}) bool {
                messageQueues := v.([]*v2.MessageQueue)
                for _, messageQueue := range messageQueues {
@@ -388,16 +397,9 @@ func (cli *defaultClient) trySyncSettings() {
        }
 }
 
-func (cli *defaultClient) mustSyncSettings() error {
-       cli.log.Info("start mustSyncSettings")
+func (cli *defaultClient) mustSyncSettingsToTargert(target string) error {
        command := cli.getSettingsCommand()
-       targets := cli.getTotalTargets()
-       for _, target := range targets {
-               if err := cli.telemeter(target, command); err != nil {
-                       return err
-               }
-       }
-       return nil
+       return cli.telemeter(target, command)
 }
 
 func (cli *defaultClient) telemeter(target string, command 
*v2.TelemetryCommand) error {
@@ -423,27 +425,11 @@ func (cli *defaultClient) startUp() error {
        cm.RegisterClient(cli)
        cli.clientManager = cm
        for _, topic := range cli.initTopics {
-               maxAttempts := 
int(cli.settings.GetRetryPolicy().GetMaxAttempts())
-               for i := 0; i < maxAttempts; i++ {
-                       _, err := cli.getMessageQueues(context.Background(), 
topic)
-                       if err != nil {
-                               if i == maxAttempts-1 {
-                                       return fmt.Errorf("failed to get topic 
route data result from remote during client startup, clientId=%s, topics=%v, 
err=%v", cli.clientID, cli.initTopics, err)
-                               } else {
-                                       cli.log.Errorf("failed to get topic 
route data result from remote during client startup, topics=%v, err=%v. retry 
attempt=%d", cli.initTopics, err, i)
-                                       time.Sleep(time.Second * 3)
-                               }
-                       } else {
-                               if i > 0 {
-                                       cli.log.Infof("retry to get topic route 
data success, attempts=%d\n", i)
-                               }
-                               break
-                       }
+               _, err := cli.getMessageQueues(context.Background(), topic)
+               if err != nil {
+                       return fmt.Errorf("failed to get topic route data 
result from remote during client startup, clientId=%s, topics=%v, err=%v", 
cli.clientID, cli.initTopics, err)
                }
        }
-       if err := cli.mustSyncSettings(); err != nil {
-               return err
-       }
        f := func() {
                cli.router.Range(func(k, v interface{}) bool {
                        topic := k.(string)

Reply via email to