This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new d3b6435  make all remote unit tests pass (#76)
d3b6435 is described below

commit d3b6435e80126dc9cfd0b2406ea32716085a1aaa
Author: 高峰 <[email protected]>
AuthorDate: Mon Jul 1 14:01:04 2019 +0800

    make all remote unit tests pass (#76)
---
 remote/codec.go              |  9 ++---
 remote/codec_test.go         | 80 +++++++++++++++++++++++++++++++-------------
 remote/remote_client.go      | 24 ++++++++-----
 remote/remote_client_test.go | 48 +++++++++++++++++++-------
 4 files changed, 113 insertions(+), 48 deletions(-)

diff --git a/remote/codec.go b/remote/codec.go
index 8e499fe..aebf75b 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -58,10 +58,11 @@ type CustomHeader interface {
 
 func NewRemotingCommand(code int16, header CustomHeader, body []byte) 
*RemotingCommand {
        cmd := &RemotingCommand{
-               Code:    code,
-               Version: _Version,
-               Opaque:  atomic.AddInt32(&opaque, 1),
-               Body:    body,
+               Code:     code,
+               Version:  _Version,
+               Opaque:   atomic.AddInt32(&opaque, 1),
+               Body:     body,
+               Language: _LanguageCode,
        }
 
        if header != nil {
diff --git a/remote/codec_test.go b/remote/codec_test.go
index e48b0bb..2e836c7 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -83,14 +83,29 @@ func Test_decode(t *testing.T) {
                if err != nil {
                        t.Fatalf("encode RemotingCommand to bytes fail: %v", 
err)
                }
-
+               bs = bs[4:]
                decodedRc, err := decode(bs)
                if err != nil {
                        t.Fatalf("decode bytes to RemotingCommand fail: %v", 
err)
                }
 
-               if !reflect.DeepEqual(*rc, *decodedRc) {
-                       t.Fatal("decoded RemotingCommand not equal to the 
original one")
+               if rc.Code != decodedRc.Code {
+                       t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, 
decodedRc.Code)
+               }
+               if rc.Version != decodedRc.Version {
+                       t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, 
decodedRc.Version)
+               }
+               if rc.Opaque != decodedRc.Opaque {
+                       t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, 
decodedRc.Opaque)
+               }
+               if rc.Remark != decodedRc.Remark {
+                       t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, 
decodedRc.Remark)
+               }
+               if rc.Flag != decodedRc.Flag {
+                       t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, 
decodedRc.Flag)
+               }
+               if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+                       t.Fatalf("wrong extFields, want=%v, got=%v", 
rc.ExtFields, decodedRc.ExtFields)
                }
        }
 }
@@ -102,7 +117,7 @@ func Benchmark_decode(b *testing.B) {
                b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
        }
        b.ResetTimer()
-
+       bs = bs[4:]
        for i := 0; i < b.N; i++ {
                if _, err := decode(bs); err != nil {
                        b.Fatalf("decode bytes to RemotingCommand fail: %v", 
err)
@@ -145,14 +160,23 @@ func Test_jsonCodec_decodeHeader(t *testing.T) {
                        t.Fatalf("decode header with jsonCodec fail: %v", err)
                }
 
-               if rc.Code != decodedRc.Code ||
-                       rc.Language != decodedRc.Language ||
-                       rc.Version != decodedRc.Version ||
-                       rc.Opaque != rc.Opaque ||
-                       rc.Flag != rc.Flag ||
-                       rc.Remark != rc.Remark ||
-                       !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
-                       t.Fatal("decoded RemotingCommand not equal to the 
original one")
+               if rc.Code != decodedRc.Code {
+                       t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, 
decodedRc.Code)
+               }
+               if rc.Version != decodedRc.Version {
+                       t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, 
decodedRc.Version)
+               }
+               if rc.Opaque != decodedRc.Opaque {
+                       t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, 
decodedRc.Opaque)
+               }
+               if rc.Remark != decodedRc.Remark {
+                       t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, 
decodedRc.Remark)
+               }
+               if rc.Flag != decodedRc.Flag {
+                       t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, 
decodedRc.Flag)
+               }
+               if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+                       t.Fatalf("wrong extFields, want=%v, got=%v", 
rc.ExtFields, decodedRc.ExtFields)
                }
        }
 }
@@ -206,16 +230,25 @@ func Test_rmqCodec_decodeHeader(t *testing.T) {
                if err != nil {
                        t.Fatalf("decode header with rmqCodec fail: %v", err)
                }
-
-               if rc.Code != decodedRc.Code ||
-                       rc.Language != decodedRc.Language ||
-                       rc.Version != decodedRc.Version ||
-                       rc.Opaque != rc.Opaque ||
-                       rc.Flag != rc.Flag ||
-                       rc.Remark != rc.Remark ||
-                       !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
-                       t.Fatal("decoded RemotingCommand not equal to the 
original one")
+               if rc.Code != decodedRc.Code {
+                       t.Fatalf("wrong Code. want=%d, got=%d", rc.Code, 
decodedRc.Code)
+               }
+               if rc.Version != decodedRc.Version {
+                       t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, 
decodedRc.Version)
+               }
+               if rc.Opaque != decodedRc.Opaque {
+                       t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, 
decodedRc.Opaque)
+               }
+               if rc.Remark != decodedRc.Remark {
+                       t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, 
decodedRc.Remark)
+               }
+               if rc.Flag != decodedRc.Flag {
+                       t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag, 
decodedRc.Flag)
                }
+               if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+                       t.Fatalf("wrong extFields, want=%v, got=%v", 
rc.ExtFields, decodedRc.ExtFields)
+               }
+
        }
 }
 
@@ -246,6 +279,7 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
                        t.Errorf("failed to encode remotingCommand, result is 
empty.")
                }
        }
+       cmdData = cmdData[4:]
        newCmd, err := decode(cmdData)
        if err != nil {
                t.Errorf("failed to decode remoting in JSON. %s", err)
@@ -253,9 +287,6 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
        if newCmd.Code != cmd.Code {
                t.Errorf("wrong command code. want=%d, got=%d", cmd.Code, 
newCmd.Code)
        }
-       if newCmd.Language != cmd.Language {
-               t.Errorf("wrong command language. want=%d, got=%d", 
cmd.Language, newCmd.Language)
-       }
        if newCmd.Version != cmd.Version {
                t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, 
newCmd.Version)
        }
@@ -282,6 +313,7 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
                        t.Errorf("failed to encode remotingCommand, result is 
empty.")
                }
        }
+       cmdData = cmdData[4:]
        newCmd, err := decode(cmdData)
        if err != nil {
                t.Errorf("failed to decode remoting in JSON. %s", err)
diff --git a/remote/remote_client.go b/remote/remote_client.go
index 93f81ce..40c10f0 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -72,20 +72,27 @@ func (r *ResponseFuture) isTimeout() bool {
 }
 
 func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+       var (
+               cmd *RemotingCommand
+               err error
+       )
+       timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
        for {
                select {
                case <-r.Done:
-                       if r.Err != nil {
-                               return nil, r.Err
-                       }
-                       return r.ResponseCommand, nil
-               case <-time.After(r.TimeoutMillis * time.Millisecond):
-                       return nil, ErrRequestTimeout
+                       cmd, err = r.ResponseCommand, r.Err
+                       goto done
+               case <-timer.C:
+                       err = ErrRequestTimeout
+                       goto done
                }
        }
+done:
+       timer.Stop()
+       return cmd, err
 }
 
-type ClientRequestFunc func(remotingCommand *RemotingCommand) (responseCommand 
*RemotingCommand)
+type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
 
 type TcpOption struct {
        // TODO
@@ -116,6 +123,7 @@ func (c *RemotingClient) InvokeSync(addr string, request 
*RemotingCommand, timeo
        resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
        c.responseTable.Store(resp.Opaque, resp)
        err = c.sendRequest(conn, request)
+       defer c.responseTable.Delete(request.Opaque)
        if err != nil {
                return nil, err
        }
@@ -209,7 +217,7 @@ func (c *RemotingClient) receiveResponse(r net.Conn) {
                                go func() { // 单个goroutine会造成死锁
                                        res := f(cmd)
                                        if res != nil {
-                                               err := c.sendRequest(r, cmd)
+                                               err := c.sendRequest(r, res)
                                                if err != nil {
                                                        rlog.Warnf("send 
response to broker error: %s, type is: %d", err, res.Code)
                                                }
diff --git a/remote/remote_client_test.go b/remote/remote_client_test.go
index acd0d26..c999c0e 100644
--- a/remote/remote_client_test.go
+++ b/remote/remote_client_test.go
@@ -42,7 +42,7 @@ func TestNewResponseFuture(t *testing.T) {
                        future.TimeoutMillis, time.Duration(1000))
        }
        if future.callback != nil {
-               t.Errorf("wrong ResponseFuture's callback. want=<nil>, got=%v", 
future.callback)
+               t.Errorf("wrong ResponseFuture's callback. want=<nil>, 
got!=<nil>")
        }
        if future.Done == nil {
                t.Errorf("wrong ResponseFuture's Done. want=<channel>, 
got=<nil>")
@@ -129,15 +129,28 @@ func TestCreateScanner(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to encode RemotingCommand. %s", err)
        }
+       client := NewRemotingClient()
        reader := bytes.NewReader(content)
-       scanner := createScanner(reader)
+       scanner := client.createScanner(reader)
        for scanner.Scan() {
                rcr, err := decode(scanner.Bytes())
                if err != nil {
                        t.Fatalf("failedd to decode RemotingCommand from 
scanner")
                }
-               if !reflect.DeepEqual(*r, *rcr) {
-                       t.Fatal("decoded RemotingCommand not equal to the 
original one")
+               if r.Code != rcr.Code {
+                       t.Fatalf("wrong Code. want=%d, got=%d", r.Code, 
rcr.Code)
+               }
+               if r.Version != rcr.Version {
+                       t.Fatalf("wrong Version. want=%d, got=%d", r.Version, 
rcr.Version)
+               }
+               if r.Opaque != rcr.Opaque {
+                       t.Fatalf("wrong Opaque. want=%d, got=%d", r.Opaque, 
rcr.Opaque)
+               }
+               if r.Flag != rcr.Flag {
+                       t.Fatalf("wrong flag. want=%d, got=%d", r.Opaque, 
rcr.Opaque)
+               }
+               if !reflect.DeepEqual(r.ExtFields, rcr.ExtFields) {
+                       t.Fatalf("wrong extFields. want=%v, got=%v", 
r.ExtFields, rcr.ExtFields)
                }
        }
 }
@@ -149,8 +162,9 @@ func TestInvokeSync(t *testing.T) {
        serverSendRemotingCommand.Flag = ResponseType
        var wg sync.WaitGroup
        wg.Add(1)
+       client := NewRemotingClient()
        go func() {
-               receiveCommand, err := InvokeSync(":3000",
+               receiveCommand, err := client.InvokeSync(":3000",
                        clientSendRemtingCommand, time.Duration(1000))
                if err != nil {
                        t.Fatalf("failed to invoke synchronous. %s", err)
@@ -173,7 +187,7 @@ func TestInvokeSync(t *testing.T) {
                        return
                }
                defer conn.Close()
-               scanner := createScanner(conn)
+               scanner := client.createScanner(conn)
                for scanner.Scan() {
                        receivedRemotingCommand, err := decode(scanner.Bytes())
                        if err != nil {
@@ -205,9 +219,13 @@ func TestInvokeAsync(t *testing.T) {
 
        var wg sync.WaitGroup
        wg.Add(1)
+       client := NewRemotingClient()
        go func() {
-               err := InvokeAsync(":3000", clientSendRemtingCommand,
+               time.Sleep(1 * time.Second)
+               t.Logf("invoke async method")
+               err := client.InvokeAsync(":3000", clientSendRemtingCommand,
                        time.Duration(1000), func(r *ResponseFuture) {
+                               t.Logf("invoke async callback")
                                if string(r.ResponseCommand.Body) != "Welcome 
native" {
                                        t.Errorf("wrong responseCommand.Body. 
want=%s, got=%s",
                                                "Welcome native", 
string(r.ResponseCommand.Body))
@@ -231,8 +249,9 @@ func TestInvokeAsync(t *testing.T) {
                        return
                }
                defer conn.Close()
-               scanner := createScanner(conn)
+               scanner := client.createScanner(conn)
                for scanner.Scan() {
+                       t.Logf("receive request.")
                        receivedRemotingCommand, err := decode(scanner.Bytes())
                        if err != nil {
                                t.Errorf("failed to decode RemotingCommnad. 
%s", err)
@@ -241,18 +260,22 @@ func TestInvokeAsync(t *testing.T) {
                                t.Errorf("wrong code. want=%d, got=%d", 
receivedRemotingCommand.Code,
                                        clientSendRemtingCommand.Code)
                        }
+                       t.Logf("encoding response")
                        body, err := encode(serverSendRemotingCommand)
                        if err != nil {
                                t.Fatalf("failed to encode RemotingCommand")
                        }
                        _, err = conn.Write(body)
+                       t.Logf("sent response")
                        if err != nil {
                                t.Fatalf("failed to write body to conneciton.")
                        }
-                       return
+                       goto done
                }
        }
-       wg.Done()
+done:
+
+       wg.Wait()
 }
 
 func TestInvokeOneWay(t *testing.T) {
@@ -260,8 +283,9 @@ func TestInvokeOneWay(t *testing.T) {
 
        var wg sync.WaitGroup
        wg.Add(1)
+       client := NewRemotingClient()
        go func() {
-               err := InvokeOneWay(":3000", clientSendRemtingCommand, 
3*time.Second)
+               err := client.InvokeOneWay(":3000", clientSendRemtingCommand, 
3*time.Second)
                if err != nil {
                        t.Fatalf("failed to invoke synchronous. %s", err)
                }
@@ -279,7 +303,7 @@ func TestInvokeOneWay(t *testing.T) {
                        return
                }
                defer conn.Close()
-               scanner := createScanner(conn)
+               scanner := client.createScanner(conn)
                for scanner.Scan() {
                        receivedRemotingCommand, err := decode(scanner.Bytes())
                        if err != nil {

Reply via email to