This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 550f9699 try to recover on server TelemetryCommand transmission error 
(#408)
550f9699 is described below

commit 550f9699f17f340fb7684b41bae45e1b57c324de
Author: PaweÅ‚ Biegun <[email protected]>
AuthorDate: Mon Mar 20 06:43:47 2023 +0100

    try to recover on server TelemetryCommand transmission error (#408)
    
    * try to recover on server TelemetryCommand transmission error
    
    * refactor modyfying the recovery state
    
    * write tests for auxilary functions
    
    * remove binary files
    
    * add .test files to gitignore
    
    * refactor tests
    
    * write the rest of  tests
    
    * move Go .gitignore to global .gitignore
---
 .gitignore                    |   3 +
 golang/client.go              | 109 +++++++++++++++++----
 golang/client_manager.go      |   1 -
 golang/client_manager_test.go |  17 +++-
 golang/client_test.go         | 215 ++++++++++++++++++++++++++++++++++++++++++
 golang/rpc_client_mock.go     |   4 +-
 6 files changed, 323 insertions(+), 26 deletions(-)

diff --git a/.gitignore b/.gitignore
index 924f09fb..4959de17 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,3 +41,6 @@ rust/src/pb/*.rs
 composer.phar
 composer.lock
 vendor/
+
+# Go
+*.tests
diff --git a/golang/client.go b/golang/client.go
index 2b95f2bc..313092ee 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -51,11 +51,13 @@ type isClient interface {
        onVerifyMessageCommand(endpoints *v2.Endpoints, command 
*v2.VerifyMessageCommand) error
 }
 type defaultClientSession struct {
-       endpoints    *v2.Endpoints
-       observer     v2.MessagingService_TelemetryClient
-       observerLock sync.RWMutex
-       cli          *defaultClient
-       timeout      time.Duration
+       endpoints        *v2.Endpoints
+       observer         v2.MessagingService_TelemetryClient
+       observerLock     sync.RWMutex
+       cli              *defaultClient
+       timeout          time.Duration
+       recovering       bool
+       recoveryWaitTime time.Duration `default:"5s"`
 }
 
 func NewDefaultClientSession(target string, cli *defaultClient) 
(*defaultClientSession, error) {
@@ -64,36 +66,79 @@ func NewDefaultClientSession(target string, cli 
*defaultClient) (*defaultClientS
                return nil, err
        }
        cs := &defaultClientSession{
-               endpoints: endpoints,
-               cli:       cli,
-               timeout:   365 * 24 * time.Hour,
+               endpoints:  endpoints,
+               cli:        cli,
+               timeout:    365 * 24 * time.Hour,
+               recovering: false,
        }
        cs.startUp()
        return cs, nil
 }
+
+func (cs *defaultClientSession) _acquire_observer() 
(v2.MessagingService_TelemetryClient, bool) {
+       cs.observerLock.RLock()
+       observer := cs.observer
+       cs.observerLock.RUnlock()
+
+       if observer == nil {
+               time.Sleep(time.Second)
+               return nil, false
+       } else {
+               return observer, true
+       }
+
+}
+
+func (cs *defaultClientSession) _execute_server_telemetry_command(command 
*v2.TelemetryCommand) {
+       err := cs.handleTelemetryCommand(command)
+       if err != nil {
+               cs.cli.log.Errorf("telemetryCommand recv err=%w", err)
+       } else {
+               cs.cli.log.Info("Executed command successfully")
+       }
+}
+
 func (cs *defaultClientSession) startUp() {
        cs.cli.log.Infof("defaultClientSession is startUp! endpoints=%v", 
cs.endpoints)
        go func() {
                for {
-                       cs.observerLock.RLock()
-                       observer := cs.observer
-                       cs.observerLock.RUnlock()
-
-                       if observer == nil {
-                               time.Sleep(time.Second)
+                       // ensure that observer is present, if not wait for it 
to be regenerated on publish.
+                       observer, acquired_observer := cs._acquire_observer()
+                       if !acquired_observer {
                                continue
                        }
+
                        response, err := observer.Recv()
                        if err != nil {
-                               cs.release()
-
-                               cs.cli.log.Errorf("telemetryCommand recv 
err=%w", err)
+                               // we are recovering
+                               if !cs.recovering {
+                                       cs.cli.log.Info("Encountered error 
while receiving TelemetryCommand, trying to recover")
+                                       // we wait five seconds to give time 
for the transmission error to be resolved externally before we attempt to read 
the message again.
+                                       time.Sleep(cs.recoveryWaitTime)
+                                       cs.recovering = true
+                               } else {
+                                       // we are recovering but we failed to 
read the message again, resetting observer
+                                       cs.cli.log.Info("Failed to recover, 
err=%w", err)
+                                       cs.release()
+                                       cs.recovering = false
+                               }
                                continue
                        }
-                       err = cs.handleTelemetryCommand(response)
-                       if err != nil {
-                               cs.cli.log.Errorf("telemetryCommand recv 
err=%w", err)
+                       // at this point we received the message and must 
confirm that the sender is healthy
+                       if cs.recovering {
+                               // we don't know which server sent the request 
so we must check that each of the servers is healthy.
+                               // we assume that the list of the servers 
hasn't changed, so the server that sent the message is still present.
+                               hearbeat_response, err := 
cs.cli.clientManager.HeartBeat(context.TODO(), cs.endpoints, 
&v2.HeartbeatRequest{}, 10*time.Second)
+                               if err == nil && hearbeat_response.Status.Code 
== v2.Code_OK {
+                                       cs.cli.log.Info("Managed to recover, 
executing message")
+                                       
cs._execute_server_telemetry_command(response)
+                               } else {
+                                       cs.cli.log.Errorf("Failed to recover, 
Some of the servers are unhealthy, Heartbeat err=%w", err)
+                                       cs.release()
+                               }
+                               cs.recovering = false
                        }
+                       cs._execute_server_telemetry_command(response)
                }
        }()
 }
@@ -199,6 +244,30 @@ var NewClient = func(config *Config, opts ...ClientOption) 
(Client, error) {
        return cli, nil
 }
 
+var NewClientConcrete = func(config *Config, opts ...ClientOption) 
(*defaultClient, error) {
+       endpoints, err := utils.ParseTarget(config.Endpoint)
+       if err != nil {
+               return nil, err
+       }
+       cli := &defaultClient{
+               config:                        config,
+               opts:                          defaultNSOptions,
+               clientID:                      utils.GenClientID(),
+               accessPoint:                   endpoints,
+               messageInterceptors:           make([]MessageInterceptor, 0),
+               endpointsTelemetryClientTable: 
make(map[string]*defaultClientSession),
+               on:                            *atomic.NewBool(true),
+               clientManager:                 &MockClientManager{},
+       }
+       cli.log = sugarBaseLogger.With("client_id", cli.clientID)
+       for _, opt := range opts {
+               opt.apply(&cli.opts)
+       }
+       cli.done = make(chan struct{}, 1)
+       cli.clientMeterProvider = NewDefaultClientMeterProvider(cli)
+       return cli, nil
+}
+
 func (cli *defaultClient) GetClientID() string {
        return cli.clientID
 }
diff --git a/golang/client_manager.go b/golang/client_manager.go
index 44e71a14..e3e38664 100644
--- a/golang/client_manager.go
+++ b/golang/client_manager.go
@@ -137,7 +137,6 @@ func (cm *defaultClientManager) deleteRpcClient(rpcClient 
RpcClient) {
 }
 
 func (cm *defaultClientManager) clearIdleRpcClients() {
-       sugarBaseLogger.Info("clientManager start clearIdleRpcClients")
        cm.rpcClientTableLock.Lock()
        defer cm.rpcClientTableLock.Unlock()
        for target, rpcClient := range cm.rpcClientTable {
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 6d794c81..d6158288 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -38,7 +38,9 @@ var MOCK_CLIENT *MockClient
 var MOCK_RPC_CLIENT *MockRpcClient
 
 type MOCK_MessagingService_TelemetryClient struct {
-       trace []string
+       trace            []string
+       recv_error_count int            `default:"0"`
+       cli              *defaultClient `default:"nil"`
 }
 
 // CloseSend implements v2.MessagingService_TelemetryClient
@@ -80,7 +82,18 @@ func (mt *MOCK_MessagingService_TelemetryClient) Trailer() 
metadata.MD {
 // Recv implements v2.MessagingService_TelemetryClient
 func (mt *MOCK_MessagingService_TelemetryClient) Recv() (*v2.TelemetryCommand, 
error) {
        mt.trace = append(mt.trace, "recv")
-       return nil, io.EOF
+       if mt.recv_error_count >= 1 {
+               mt.recv_error_count -= 1
+               return nil, io.EOF
+       } else {
+               if mt.cli == nil {
+                       return nil, io.EOF
+               } else {
+                       time.Sleep(time.Second)
+                       command := mt.cli.getSettingsCommand()
+                       return command, nil
+               }
+       }
 }
 
 // Send implements v2.MessagingService_TelemetryClient
diff --git a/golang/client_test.go b/golang/client_test.go
index 5265a763..41079ccd 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -18,15 +18,88 @@
 package golang
 
 import (
+       "context"
        "fmt"
        "testing"
        "time"
 
        "github.com/apache/rocketmq-clients/golang/credentials"
+       v2 "github.com/apache/rocketmq-clients/golang/protocol/v2"
        gomock "github.com/golang/mock/gomock"
        "github.com/prashantv/gostub"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "go.uber.org/zap"
+       "go.uber.org/zap/zaptest/observer"
 )
 
+func BuildCLient(t *testing.T) *defaultClient {
+       stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
+               RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
+
+               RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Hour,
+               RPC_CLIENT_IDLE_CHECK_PERIOD:        time.Hour,
+
+               HEART_BEAT_INITIAL_DELAY: time.Hour,
+               HEART_BEAT_PERIOD:        time.Hour,
+
+               LOG_STATS_INITIAL_DELAY: time.Hour,
+               LOG_STATS_PERIOD:        time.Hour,
+
+               SYNC_SETTINGS_DELAY:  time.Hour,
+               SYNC_SETTINGS_PERIOD: time.Hour,
+       })
+
+       stubs2 := gostub.Stub(&NewRpcClient, func(target string, opts 
...RpcClientOption) (RpcClient, error) {
+               if target == fakeAddress {
+                       return MOCK_RPC_CLIENT, nil
+               }
+               return nil, fmt.Errorf("invalid target=%s", target)
+       })
+
+       defer func() {
+               stubs.Reset()
+               stubs2.Reset()
+       }()
+
+       
MOCK_RPC_CLIENT.EXPECT().Telemetry(gomock.Any()).Return(&MOCK_MessagingService_TelemetryClient{
+               trace: make([]string, 0),
+       }, nil)
+
+       endpoints := fmt.Sprintf("%s:%d", fakeHost, fakePort)
+       cli, err := NewClientConcrete(&Config{
+               Endpoint:    endpoints,
+               Credentials: &credentials.SessionCredentials{},
+       })
+       if err != nil {
+               t.Error(err)
+       }
+       sugarBaseLogger.Info(cli)
+       err = cli.startUp()
+       if err != nil {
+               t.Error(err)
+       }
+
+       return cli
+}
+
+func GetClientAndDefaultClientSession(t *testing.T) (*defaultClient, 
*defaultClientSession) {
+       cli := BuildCLient(t)
+       default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+       if err != nil {
+               t.Error(err)
+       }
+       return cli, default_cli_session
+}
+
+func PrepareTestLogger(cli *defaultClient) *observer.ObservedLogs {
+       observedZapCore, observedLogs := observer.New(zap.InfoLevel)
+       observedLogger := zap.New(observedZapCore)
+       cli.log = observedLogger.Sugar()
+
+       return observedLogs
+}
+
 func TestCLINewClient(t *testing.T) {
        stubs := gostub.Stub(&defaultClientManagerOptions, clientManagerOptions{
                RPC_CLIENT_MAX_IDLE_DURATION: time.Second,
@@ -74,3 +147,145 @@ func TestCLINewClient(t *testing.T) {
                t.Error(err)
        }
 }
+
+func Test_acquire_observer_uninitialized(t *testing.T) {
+       // given
+       _, default_cli_session := GetClientAndDefaultClientSession(t)
+
+       // when
+       observer, acquired_observer := default_cli_session._acquire_observer()
+
+       // then
+       if acquired_observer {
+               t.Error("Acquired observer even though it is uninitialized")
+       }
+       if observer != nil {
+               t.Error("Observer should be nil")
+       }
+}
+
+func Test_acquire_observer_initialized(t *testing.T) {
+       // given
+       _, default_cli_session := GetClientAndDefaultClientSession(t)
+       default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+
+       // when
+       observer, acquired_observer := default_cli_session._acquire_observer()
+
+       // then
+       if !acquired_observer {
+               t.Error("Failed to acquire observer even though it is 
uninitialized")
+       }
+       if observer == nil {
+               t.Error("Observer should be not nil")
+       }
+}
+
+func Test_execute_server_telemetry_command_fail(t *testing.T) {
+       // given
+       cli, default_cli_session := GetClientAndDefaultClientSession(t)
+       default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+       observedLogs := PrepareTestLogger(cli)
+
+       // when
+       
default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{})
+
+       // then
+       require.Equal(t, 1, observedLogs.Len())
+       commandExecutionLog := observedLogs.All()[0]
+       assert.Equal(t, "telemetryCommand recv 
err=%!w(*errors.errorString=&{handleTelemetryCommand err = Command is nil})", 
commandExecutionLog.Message)
+}
+
+func Test_execute_server_telemetry_command(t *testing.T) {
+       // given
+       cli, default_cli_session := GetClientAndDefaultClientSession(t)
+       default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+       observedLogs := PrepareTestLogger(cli)
+
+       // when
+       
default_cli_session._execute_server_telemetry_command(&v2.TelemetryCommand{Command:
 &v2.TelemetryCommand_RecoverOrphanedTransactionCommand{}})
+
+       // then
+       require.Equal(t, 2, observedLogs.Len())
+       commandExecutionLog := observedLogs.All()[1]
+       assert.Equal(t, "Executed command successfully", 
commandExecutionLog.Message)
+}
+
+func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
+       // given
+       cli := BuildCLient(t)
+       default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+       if err != nil {
+               t.Error(err)
+       }
+       default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+       observedLogs := PrepareTestLogger(cli)
+       default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+               recv_error_count: 0,
+               cli:              cli,
+       }
+       default_cli_session.recoveryWaitTime = time.Second
+       cli.settings = &simpleConsumerSettings{}
+
+       // when
+       // we wait some time while consumer goroutine runs
+       time.Sleep(3 * time.Second)
+
+       // then
+       commandExecutionLog := observedLogs.All()[:2]
+       assert.Equal(t, "Executed command successfully", 
commandExecutionLog[0].Message)
+       assert.Equal(t, "Executed command successfully", 
commandExecutionLog[1].Message)
+}
+
+func TestRestoreDefaultClientSessionOneError(t *testing.T) {
+       // given
+       cli := BuildCLient(t)
+       default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+       if err != nil {
+               t.Error(err)
+       }
+       default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+       observedLogs := PrepareTestLogger(cli)
+       default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+               recv_error_count: 1,
+               cli:              cli,
+       }
+       default_cli_session.recoveryWaitTime = time.Second
+       cli.settings = &simpleConsumerSettings{}
+
+       // when
+       // we wait some time while consumer goroutine runs
+       time.Sleep(3 * time.Second)
+
+       // then
+       commandExecutionLog := observedLogs.All()[:3]
+       assert.Equal(t, "Encountered error while receiving TelemetryCommand, 
trying to recover", commandExecutionLog[0].Message)
+       assert.Equal(t, "Managed to recover, executing message", 
commandExecutionLog[1].Message)
+       assert.Equal(t, "Executed command successfully", 
commandExecutionLog[2].Message)
+}
+
+func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
+       // given
+       cli := BuildCLient(t)
+       default_cli_session, err := cli.getDefaultClientSession(fakeAddress)
+       if err != nil {
+               t.Error(err)
+       }
+       default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
+       observedLogs := PrepareTestLogger(cli)
+       default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
+               recv_error_count: 2,
+               cli:              cli,
+       }
+       default_cli_session.recoveryWaitTime = time.Second
+       cli.settings = &simpleConsumerSettings{}
+
+       // when
+       // we wait some time while consumer goroutine runs
+       time.Sleep(3 * time.Second)
+
+       // then
+       commandExecutionLog := observedLogs.All()[:2]
+       assert.Equal(t, "Encountered error while receiving TelemetryCommand, 
trying to recover", commandExecutionLog[0].Message)
+       assert.Equal(t, "Failed to recover, err=%wEOF", 
commandExecutionLog[1].Message)
+}
diff --git a/golang/rpc_client_mock.go b/golang/rpc_client_mock.go
index 0cd2372c..fb2ddf8a 100644
--- a/golang/rpc_client_mock.go
+++ b/golang/rpc_client_mock.go
@@ -215,9 +215,7 @@ func (mr *MockRpcClientMockRecorder) Telemetry(ctx 
interface{}) *gomock.Call {
 // idleDuration mocks base method.
 func (m *MockRpcClient) idleDuration() time.Duration {
        m.ctrl.T.Helper()
-       ret := m.ctrl.Call(m, "idleDuration")
-       ret0, _ := ret[0].(time.Duration)
-       return ret0
+       return time.Hour
 }
 
 // idleDuration indicates an expected call of idleDuration.

Reply via email to