This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new ca0b2a8  fix tcp scanner bug (#41)
ca0b2a8 is described below

commit ca0b2a802015681f9ec3c00d6b6e6f6add9d87ed
Author: 高峰 <[email protected]>
AuthorDate: Tue Mar 12 20:50:58 2019 +0800

    fix tcp scanner bug (#41)
    
    * complete remote/client.go interface  and unit test
    
    * [bug-fix]
    - fix remote/client.go `createScanner` method
    - add `InvokeSync`. `InvokeAsync` and `InvokeOneWay` unit test cases
---
 remote/client.go      |   4 +-
 remote/client_test.go | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++
 remote/codec_test.go  |  18 ------
 3 files changed, 167 insertions(+), 20 deletions(-)

diff --git a/remote/client.go b/remote/client.go
index 94f5ed2..095c304 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -263,8 +263,8 @@ func createScanner(r io.Reader) *bufio.Scanner {
                        if len(data) >= 4 {
                                var length int32
                                binary.Read(bytes.NewReader(data[0:4]), 
binary.BigEndian, &length)
-                               if int(length)+4 <= len(data) {
-                                       return int(length) + 4, 
data[:int(length)+4], nil
+                               if int(length) <= len(data) {
+                                       return int(length), data[:length], nil
                                }
                        }
                }
diff --git a/remote/client_test.go b/remote/client_test.go
index 55a26fc..40f6e50 100644
--- a/remote/client_test.go
+++ b/remote/client_test.go
@@ -19,6 +19,7 @@ package remote
 import (
        "bytes"
        "errors"
+       "net"
        "reflect"
        "sync"
        "testing"
@@ -176,4 +177,168 @@ func TestCreateScanner(t *testing.T) {
                        t.Fatal("decoded RemotingCommand not equal to the 
original one")
                }
        }
+}
+
+func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
+       clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello 
RocketMQ"))
+       serverSendRemotingCommand := NewRemotingCommand(20, nil, 
[]byte("Welcome native"))
+       serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+       serverSendRemotingCommand.Flag = ResponseType
+
+       client := NewDefaultRemotingClient()
+       client.Start()
+       defer client.Shutdown()
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               receiveCommand, err := client.InvokeSync(":3000",
+                       clientSendRemtingCommand, time.Duration(1000))
+               if err != nil {
+                       t.Fatalf("failed to invoke synchronous. %s", err)
+               } else {
+                       if !reflect.DeepEqual(&receiveCommand, 
&serverSendRemotingCommand) {
+                               t.Errorf("remotingCommand prased in client is 
different from server. ")
+                       }
+               }
+               wg.Done()
+       }()
+
+       l, err := net.Listen("tcp", ":3000")
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer l.Close()
+       for {
+               conn, err := l.Accept()
+               if err != nil {
+                       return
+               }
+               defer conn.Close()
+               scanner := createScanner(conn)
+               for scanner.Scan() {
+                       receivedRemotingCommand, err := decode(scanner.Bytes())
+                       if err != nil {
+                               t.Errorf("failed to decode RemotingCommnad. 
%s", err)
+                       }
+                       if clientSendRemtingCommand.Code != 
receivedRemotingCommand.Code {
+                               t.Errorf("wrong code. want=%d, 
got=%d",receivedRemotingCommand.Code,
+                                       clientSendRemtingCommand.Code)
+                       }
+                       body, err := encode(serverSendRemotingCommand)
+                       if err != nil {
+                               t.Fatalf("failed to encode RemotingCommand")
+                       }
+                       _, err = conn.Write(body)
+                       if err != nil {
+                               t.Fatalf("failed to write body to conneciton.")
+                       }
+                       return
+               }
+       }
+       wg.Done()
+}
+
+func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
+       clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello 
RocketMQ"))
+       serverSendRemotingCommand := NewRemotingCommand(20, nil, 
[]byte("Welcome native"))
+       serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+       serverSendRemotingCommand.Flag = ResponseType
+
+       client := NewDefaultRemotingClient()
+       client.Start()
+       defer client.Shutdown()
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+                       time.Duration(1000), func(r *ResponseFuture) {
+                       if string(r.ResponseCommand.Body) != "Welcome native" {
+                               t.Errorf("wrong responseCommand.Body. want=%s, 
got=%s",
+                                       "Welcome native",  
string(r.ResponseCommand.Body))
+                       }
+                       wg.Done()
+               })
+               if err != nil {
+                       t.Errorf("failed to invokeSync. %s", err)
+               }
+
+       }()
+
+       l, err := net.Listen("tcp", ":3000")
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer l.Close()
+       for {
+               conn, err := l.Accept()
+               if err != nil {
+                       return
+               }
+               defer conn.Close()
+               scanner := createScanner(conn)
+               for scanner.Scan() {
+                       receivedRemotingCommand, err := decode(scanner.Bytes())
+                       if err != nil {
+                               t.Errorf("failed to decode RemotingCommnad. 
%s", err)
+                       }
+                       if clientSendRemtingCommand.Code != 
receivedRemotingCommand.Code {
+                               t.Errorf("wrong code. want=%d, 
got=%d",receivedRemotingCommand.Code,
+                                       clientSendRemtingCommand.Code)
+                       }
+                       body, err := encode(serverSendRemotingCommand)
+                       if err != nil {
+                               t.Fatalf("failed to encode RemotingCommand")
+                       }
+                       _, err = conn.Write(body)
+                       if err != nil {
+                               t.Fatalf("failed to write body to conneciton.")
+                       }
+                       return
+               }
+       }
+       wg.Done()
+}
+
+
+func TestDefaultRemotingClient_InvokeOneWay(t *testing.T) {
+       clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello 
RocketMQ"))
+
+       client := NewDefaultRemotingClient()
+       client.Start()
+       defer client.Shutdown()
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               err := client.InvokeOneWay(":3000", clientSendRemtingCommand)
+               if err != nil {
+                       t.Fatalf("failed to invoke synchronous. %s", err)
+               }
+               wg.Done()
+       }()
+
+       l, err := net.Listen("tcp", ":3000")
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer l.Close()
+       for {
+               conn, err := l.Accept()
+               if err != nil {
+                       return
+               }
+               defer conn.Close()
+               scanner := createScanner(conn)
+               for scanner.Scan() {
+                       receivedRemotingCommand, err := decode(scanner.Bytes())
+                       if err != nil {
+                               t.Errorf("failed to decode RemotingCommnad. 
%s", err)
+                       }
+                       if clientSendRemtingCommand.Code != 
receivedRemotingCommand.Code {
+                               t.Errorf("wrong code. want=%d, 
got=%d",receivedRemotingCommand.Code,
+                                       clientSendRemtingCommand.Code)
+                       }
+                       return
+               }
+       }
+       wg.Done()
 }
\ No newline at end of file
diff --git a/remote/codec_test.go b/remote/codec_test.go
index 5e63e4a..ab1a61e 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -269,15 +269,6 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
        if newCmd.Remark != cmd.Remark {
                t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, 
newCmd.Remark)
        }
-       //for k, v := range cmd.ExtFields {
-       //      if vv, ok := newCmd.ExtFields[k]; !ok {
-       //              t.Errorf("key: %s not exists in newCommand.", k)
-       //      } else {
-       //              if v != vv {
-       //                      t.Errorf("wrong value. want=%s, got=%s", v, vv)
-       //              }
-       //      }
-       //}
 }
 
 func TestCommandRocketMQEncodeDecode(t *testing.T) {
@@ -314,13 +305,4 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
        if newCmd.Remark != cmd.Remark {
                t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, 
newCmd.Remark)
        }
-       //for k, v := range cmd.ExtFields {
-       //      if vv, ok := newCmd.ExtFields[k]; !ok {
-       //              t.Errorf("key: %s not exists in newCommand.", k)
-       //      } else {
-       //              if v != vv {
-       //                      t.Errorf("wrong value. want=%s, got=%s", v, vv)
-       //              }
-       //      }
-       //}
 }

Reply via email to