This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new d821b203 golang: fast to the ok state (#295)
d821b203 is described below

commit d821b203d357f93e0f7fe07521ccb0afcfd1ba29
Author: guyinyou <[email protected]>
AuthorDate: Mon Dec 19 15:32:34 2022 +0800

    golang: fast to the ok state (#295)
    
    * golang: fast to the ok state
    
    * fix utest
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/client.go              |  12 ++++-
 golang/client_manager_test.go |  57 +++++++++++++++++++++++
 golang/client_test.go         |  35 ++++++++++++++
 golang/producer_test.go       | 104 ++++++++++--------------------------------
 golang/simple_consumer.go     |   2 +-
 5 files changed, 128 insertions(+), 82 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index c094f1ea..c9d3132f 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -299,6 +299,14 @@ func (cli *defaultClient) getQueryRouteRequest(topic 
string) *v2.QueryRouteReque
 func (cli *defaultClient) getTotalTargets() []string {
        endpoints := make([]string, 0)
        endpointsSet := make(map[string]bool)
+       for _, address := range cli.accessPoint.GetAddresses() {
+               target := utils.ParseAddress(address)
+               if _, ok := endpointsSet[target]; ok {
+                       continue
+               }
+               endpointsSet[target] = true
+               endpoints = append(endpoints, target)
+       }
        cli.router.Range(func(_, v interface{}) bool {
                messageQueues := v.([]*v2.MessageQueue)
                for _, messageQueue := range messageQueues {
@@ -372,7 +380,7 @@ func (cli *defaultClient) Heartbeat() {
 }
 
 func (cli *defaultClient) trySyncSettings() {
-       cli.log.Info("start syncSetting")
+       cli.log.Info("start trySyncSettings")
        command := cli.getSettingsCommand()
        targets := cli.getTotalTargets()
        for _, target := range targets {
@@ -381,7 +389,7 @@ func (cli *defaultClient) trySyncSettings() {
 }
 
 func (cli *defaultClient) mustSyncSettings() error {
-       cli.log.Info("start syncSetting")
+       cli.log.Info("start mustSyncSettings")
        command := cli.getSettingsCommand()
        targets := cli.getTotalTargets()
        for _, target := range targets {
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 2529abc7..ec8bf79d 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -20,6 +20,7 @@ package golang
 import (
        "context"
        "fmt"
+       "io"
        "os"
        "testing"
        "time"
@@ -27,6 +28,7 @@ import (
        v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
        gomock "github.com/golang/mock/gomock"
        "github.com/prashantv/gostub"
+       "google.golang.org/grpc/metadata"
 )
 
 var MOCK_CLIENT_ID = "mock_client_id"
@@ -35,6 +37,60 @@ var MOCK_GROUP = "mock_group"
 var MOCK_CLIENT *MockClient
 var MOCK_RPC_CLIENT *MockRpcClient
 
+type MOCK_MessagingService_TelemetryClient struct {
+       trace []string
+}
+
+// CloseSend implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
+       mt.trace = append(mt.trace, "closesend")
+       return nil
+}
+
+// Context implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
+       mt.trace = append(mt.trace, "context")
+       return nil
+}
+
+// Header implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) 
{
+       mt.trace = append(mt.trace, "header")
+       return nil, nil
+}
+
+// RecvMsg implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
+       mt.trace = append(mt.trace, "recvmsg")
+       return nil
+}
+
+// SendMsg implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
+       mt.trace = append(mt.trace, "sendmsg")
+       return nil
+}
+
+// Trailer implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
+       mt.trace = append(mt.trace, "trailer")
+       return nil
+}
+
+// Recv implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, 
error) {
+       mt.trace = append(mt.trace, "recv")
+       return nil, io.EOF
+}
+
+// Send implements v2.MessagingService_TelemetryClient
+func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) 
error {
+       mt.trace = append(mt.trace, "send")
+       return nil
+}
+
+var _ = 
v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
+
 func TestMain(m *testing.M) {
        os.Setenv("mq.consoleAppender.enabled", "true")
        ResetLogger()
@@ -50,6 +106,7 @@ func TestMain(m *testing.M) {
                        Code: v2.Code_OK,
                },
        }, nil).AnyTimes()
+
        MOCK_RPC_CLIENT.EXPECT().GracefulStop().Return(nil).AnyTimes()
        MOCK_RPC_CLIENT.EXPECT().GetTarget().Return(fakeAddresss).AnyTimes()
        stubs := gostub.Stub(&NewRpcClient, func(target string, opts 
...RpcClientOption) (RpcClient, error) {
diff --git a/golang/client_test.go b/golang/client_test.go
index 25b43afe..2d339990 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,11 +20,46 @@ package golang
 import (
        "fmt"
        "testing"
+       "time"
 
        "github.com/apache/rocketmq-clients/golang/credentials"
+       gomock "github.com/golang/mock/gomock"
+       "github.com/prashantv/gostub"
 )
 
 func TestCLINewClient(t *testing.T) {
+       stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
+               RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
+
+               RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
+               RPC_CLIENT_IDLE_CHECK_PERIOD:        time.Hour,
+
+               HEART_BEAT_INITIAL_DELAY: time.Hour,
+               HEART_BEAT_PERIOD:        time.Hour,
+
+               LOG_STATS_INITIAL_DELAY: time.Hour,
+               LOG_STATS_PERIOD:        time.Hour,
+
+               SYNC_SETTINGS_DELAY:  time.Hour,
+               SYNC_SETTINGS_PERIOD: time.Hour,
+       })
+
+       stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts 
...RpcClientOption) (RpcClient, error) {
+               if target == fakeAddresss {
+                       return MOCK_RPC_CLIENT, nil
+               }
+               return nil, fmt.Errorf("invalid target=%s", target)
+       })
+
+       defer func() {
+               stubs.Reset()
+               stubs2.Reset()
+       }()
+
+       
MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+               trace: make([]string, 0),
+       }, nil)
+
        endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
        cli, err := NewClient(&Config{
                Endpoint:    endpoints,
diff --git a/golang/producer_test.go b/golang/producer_test.go
index 1313e696..cc6679ea 100644
--- a/golang/producer_test.go
+++ b/golang/producer_test.go
@@ -20,7 +20,6 @@ package golang
 import (
        "context"
        "fmt"
-       "io"
        "testing"
        "time"
 
@@ -28,7 +27,6 @@ import (
        v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
        gomock "github.com/golang/mock/gomock"
        "github.com/prashantv/gostub"
-       "google.golang.org/grpc/metadata"
 )
 
 func TestProducer(t *testing.T) {
@@ -47,7 +45,22 @@ func TestProducer(t *testing.T) {
                SYNC_SETTINGS_DELAY:  time.Hour,
                SYNC_SETTINGS_PERIOD: time.Hour,
        })
-       defer stubs.Reset()
+
+       stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts 
...RpcClientOption) (RpcClient, error) {
+               if target == fakeAddresss {
+                       return MOCK_RPC_CLIENT, nil
+               }
+               return nil, fmt.Errorf("invalid target=%s", target)
+       })
+
+       defer func() {
+               stubs.Reset()
+               stubs2.Reset()
+       }()
+
+       
MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+               trace: make([]string, 0),
+       }, nil).AnyTimes()
 
        endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
        p, err := NewProducer(&Config{
@@ -72,7 +85,7 @@ func TestProducer(t *testing.T) {
                                v2.MessageType_TRANSACTION,
                        },
                }},
-       }, nil)
+       }, nil).AnyTimes()
        err = p.Start()
        if err != nil {
                t.Error(err)
@@ -88,7 +101,7 @@ func TestProducer(t *testing.T) {
                                Code: v2.Code_OK,
                        },
                        Entries: []*v2.SendResultEntry{{}},
-               }, nil)
+               }, nil).AnyTimes()
 
                _, err := p.Send(context.TODO(), msg)
                if err != nil {
@@ -101,7 +114,7 @@ func TestProducer(t *testing.T) {
                                Code: v2.Code_OK,
                        },
                        Entries: []*v2.SendResultEntry{{}},
-               }, nil)
+               }, nil).AnyTimes()
 
                done := make(chan bool)
                p.SendAsync(context.TODO(), msg, func(ctx context.Context, sr 
[]*SendReceipt, err error) {
@@ -118,12 +131,12 @@ func TestProducer(t *testing.T) {
                                Code: v2.Code_OK,
                        },
                        Entries: []*v2.SendResultEntry{{}},
-               }, nil)
+               }, nil).AnyTimes()
                MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), 
gomock.Any()).Return(&v2.EndTransactionResponse{
                        Status: &v2.Status{
                                Code: v2.Code_OK,
                        },
-               }, nil)
+               }, nil).AnyTimes()
 
                transaction := p.BeginTransaction()
                _, err := p.SendWithTransaction(context.TODO(), msg, 
transaction)
@@ -141,12 +154,12 @@ func TestProducer(t *testing.T) {
                                Code: v2.Code_OK,
                        },
                        Entries: []*v2.SendResultEntry{{}},
-               }, nil)
+               }, nil).AnyTimes()
                MOCK_RPC_CLIENT.EXPECT().EndTransaction(gomock.Any(), 
gomock.Any()).Return(&v2.EndTransactionResponse{
                        Status: &v2.Status{
                                Code: v2.Code_OK,
                        },
-               }, nil)
+               }, nil).AnyTimes()
 
                transaction := p.BeginTransaction()
                _, err := p.SendWithTransaction(context.TODO(), msg, 
transaction)
@@ -164,7 +177,7 @@ func TestProducer(t *testing.T) {
                                Code: v2.Code_OK,
                        },
                        Entries: []*v2.SendResultEntry{{}},
-               }, nil)
+               }, nil).AnyTimes()
                msg.SetMessageGroup(MOCK_GROUP)
                defer func() { msg.messageGroup = nil }()
                _, err := p.Send(context.TODO(), msg)
@@ -178,7 +191,7 @@ func TestProducer(t *testing.T) {
                                Code: v2.Code_OK,
                        },
                        Entries: []*v2.SendResultEntry{{}},
-               }, nil)
+               }, nil).AnyTimes()
                msg.SetDelayTimestamp(time.Now().Add(time.Hour))
                defer func() { msg.deliveryTimestamp = nil }()
                _, err := p.Send(context.TODO(), msg)
@@ -204,19 +217,6 @@ func TestProducer(t *testing.T) {
                        t.Error(err)
                }
        })
-       t.Run("syncsettings", func(t *testing.T) {
-               mt := &MOCK_MessagingService_TelemetryClient{
-                       trace: make([]string, 0),
-               }
-               MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(mt, nil)
-               
p.(*defaultProducer).cli.clientManager.(*defaultClientManager).syncSettings()
-               for {
-                       time.Sleep(time.Duration(100))
-                       if len(mt.trace) >= 3 && mt.trace[0] == "send" && 
mt.trace[1] == "recv" && mt.trace[2] == "closesend" {
-                               break
-                       }
-               }
-       })
        t.Run("do heartbeat", func(t *testing.T) {
                err := p.(*defaultProducer).cli.doHeartbeat(endpoints, nil)
                if err != nil {
@@ -224,57 +224,3 @@ func TestProducer(t *testing.T) {
                }
        })
 }
-
-type MOCK_MessagingService_TelemetryClient struct {
-       trace []string
-}
-
-// CloseSend implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) CloseSend() error {
-       mt.trace = append(mt.trace, "closesend")
-       return nil
-}
-
-// Context implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Context() context.Context {
-       mt.trace = append(mt.trace, "context")
-       return nil
-}
-
-// Header implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Header() (metadata.MD, error) 
{
-       mt.trace = append(mt.trace, "header")
-       return nil, nil
-}
-
-// RecvMsg implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) RecvMsg(m interface{}) error {
-       mt.trace = append(mt.trace, "recvmsg")
-       return nil
-}
-
-// SendMsg implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) SendMsg(m interface{}) error {
-       mt.trace = append(mt.trace, "sendmsg")
-       return nil
-}
-
-// Trailer implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Trailer() metadata.MD {
-       mt.trace = append(mt.trace, "trailer")
-       return nil
-}
-
-// Recv implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, 
error) {
-       mt.trace = append(mt.trace, "recv")
-       return nil, io.EOF
-}
-
-// Send implements v2.MessagingService_TelemetryClient
-func (mt *MOCK_MessagingService_TelemetryClient) Send(*v2.TelemetryCommand) 
error {
-       mt.trace = append(mt.trace, "send")
-       return nil
-}
-
-var _ = 
v2.MessagingService_TelemetryClient(&MOCK_MessagingService_TelemetryClient{})
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index fe8c7734..7b6342f5 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -207,7 +207,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx 
context.Context, request *v2
                                break
                        }
                        if err != nil {
-                               sc.cli.log.Errorf("simpleConsumer recv msg 
err=%w, requestId=%s", err, utils.GetRequestID(ctx))
+                               sc.cli.log.Errorf("simpleConsumer recv msg 
err=%v, requestId=%s", err, utils.GetRequestID(ctx))
                                break
                        }
                        sugarBaseLogger.Debugf("receiveMessage response: %v", 
resp)

Reply via email to