This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 70d43828 fix telemeter logic (#348)
70d43828 is described below
commit 70d438283aa7d87cc38fdb1de64842761d700858
Author: guyinyou <[email protected]>
AuthorDate: Fri Feb 3 17:09:33 2023 +0800
fix telemeter logic (#348)
Co-authored-by: guyinyou <[email protected]>
---
golang/client.go | 58 +++++++++++++++++++++-----------------------------------
1 file changed, 22 insertions(+), 36 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index 41a95796..2b95f2bc 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -264,6 +264,22 @@ func (cli *defaultClient) getMessageQueues(ctx
context.Context, topic string) ([
if err != nil {
return nil, err
}
+
+ // telemeter to all messageQueues
+ endpointsSet := make(map[string]bool)
+ for _, messageQueue := range route {
+ for _, address := range
messageQueue.GetBroker().GetEndpoints().GetAddresses() {
+ target := utils.ParseAddress(address)
+ if _, ok := endpointsSet[target]; ok {
+ continue
+ }
+ endpointsSet[target] = true
+ if err = cli.mustSyncSettingsToTargert(target); err !=
nil {
+ return nil, err
+ }
+ }
+ }
+
cli.router.Store(topic, route)
return route, nil
}
@@ -299,14 +315,7 @@ func (cli *defaultClient) getQueryRouteRequest(topic
string) *v2.QueryRouteReque
func (cli *defaultClient) getTotalTargets() []string {
endpoints := make([]string, 0)
endpointsSet := make(map[string]bool)
- for _, address := range cli.accessPoint.GetAddresses() {
- target := utils.ParseAddress(address)
- if _, ok := endpointsSet[target]; ok {
- continue
- }
- endpointsSet[target] = true
- endpoints = append(endpoints, target)
- }
+
cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
@@ -388,16 +397,9 @@ func (cli *defaultClient) trySyncSettings() {
}
}
-func (cli *defaultClient) mustSyncSettings() error {
- cli.log.Info("start mustSyncSettings")
+func (cli *defaultClient) mustSyncSettingsToTargert(target string) error {
command := cli.getSettingsCommand()
- targets := cli.getTotalTargets()
- for _, target := range targets {
- if err := cli.telemeter(target, command); err != nil {
- return err
- }
- }
- return nil
+ return cli.telemeter(target, command)
}
func (cli *defaultClient) telemeter(target string, command
*v2.TelemetryCommand) error {
@@ -423,27 +425,11 @@ func (cli *defaultClient) startUp() error {
cm.RegisterClient(cli)
cli.clientManager = cm
for _, topic := range cli.initTopics {
- maxAttempts :=
int(cli.settings.GetRetryPolicy().GetMaxAttempts())
- for i := 0; i < maxAttempts; i++ {
- _, err := cli.getMessageQueues(context.Background(),
topic)
- if err != nil {
- if i == maxAttempts-1 {
- return fmt.Errorf("failed to get topic
route data result from remote during client startup, clientId=%s, topics=%v,
err=%v", cli.clientID, cli.initTopics, err)
- } else {
- cli.log.Errorf("failed to get topic
route data result from remote during client startup, topics=%v, err=%v. retry
attempt=%d", cli.initTopics, err, i)
- time.Sleep(time.Second * 3)
- }
- } else {
- if i > 0 {
- cli.log.Infof("retry to get topic route
data success, attempts=%d\n", i)
- }
- break
- }
+ _, err := cli.getMessageQueues(context.Background(), topic)
+ if err != nil {
+ return fmt.Errorf("failed to get topic route data
result from remote during client startup, clientId=%s, topics=%v, err=%v",
cli.clientID, cli.initTopics, err)
}
}
- if err := cli.mustSyncSettings(); err != nil {
- return err
- }
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)