This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new d3b6435 make all remote unit tests pass (#76)
d3b6435 is described below
commit d3b6435e80126dc9cfd0b2406ea32716085a1aaa
Author: 高峰 <[email protected]>
AuthorDate: Mon Jul 1 14:01:04 2019 +0800
make all remote unit tests pass (#76)
---
remote/codec.go | 9 ++---
remote/codec_test.go | 80 +++++++++++++++++++++++++++++++-------------
remote/remote_client.go | 24 ++++++++-----
remote/remote_client_test.go | 48 +++++++++++++++++++-------
4 files changed, 113 insertions(+), 48 deletions(-)
diff --git a/remote/codec.go b/remote/codec.go
index 8e499fe..aebf75b 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -58,10 +58,11 @@ type CustomHeader interface {
func NewRemotingCommand(code int16, header CustomHeader, body []byte)
*RemotingCommand {
cmd := &RemotingCommand{
- Code: code,
- Version: _Version,
- Opaque: atomic.AddInt32(&opaque, 1),
- Body: body,
+ Code: code,
+ Version: _Version,
+ Opaque: atomic.AddInt32(&opaque, 1),
+ Body: body,
+ Language: _LanguageCode,
}
if header != nil {
diff --git a/remote/codec_test.go b/remote/codec_test.go
index e48b0bb..2e836c7 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -83,14 +83,29 @@ func Test_decode(t *testing.T) {
if err != nil {
t.Fatalf("encode RemotingCommand to bytes fail: %v",
err)
}
-
+ bs = bs[4:]
decodedRc, err := decode(bs)
if err != nil {
t.Fatalf("decode bytes to RemotingCommand fail: %v",
err)
}
- if !reflect.DeepEqual(*rc, *decodedRc) {
- t.Fatal("decoded RemotingCommand not equal to the
original one")
+ if rc.Code != decodedRc.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", rc.Code,
decodedRc.Code)
+ }
+ if rc.Version != decodedRc.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", rc.Version,
decodedRc.Version)
+ }
+ if rc.Opaque != decodedRc.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque,
decodedRc.Opaque)
+ }
+ if rc.Remark != decodedRc.Remark {
+ t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark,
decodedRc.Remark)
+ }
+ if rc.Flag != decodedRc.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag,
decodedRc.Flag)
+ }
+ if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatalf("wrong extFields, want=%v, got=%v",
rc.ExtFields, decodedRc.ExtFields)
}
}
}
@@ -102,7 +117,7 @@ func Benchmark_decode(b *testing.B) {
b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
}
b.ResetTimer()
-
+ bs = bs[4:]
for i := 0; i < b.N; i++ {
if _, err := decode(bs); err != nil {
b.Fatalf("decode bytes to RemotingCommand fail: %v",
err)
@@ -145,14 +160,23 @@ func Test_jsonCodec_decodeHeader(t *testing.T) {
t.Fatalf("decode header with jsonCodec fail: %v", err)
}
- if rc.Code != decodedRc.Code ||
- rc.Language != decodedRc.Language ||
- rc.Version != decodedRc.Version ||
- rc.Opaque != rc.Opaque ||
- rc.Flag != rc.Flag ||
- rc.Remark != rc.Remark ||
- !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
- t.Fatal("decoded RemotingCommand not equal to the
original one")
+ if rc.Code != decodedRc.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", rc.Code,
decodedRc.Code)
+ }
+ if rc.Version != decodedRc.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", rc.Version,
decodedRc.Version)
+ }
+ if rc.Opaque != decodedRc.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque,
decodedRc.Opaque)
+ }
+ if rc.Remark != decodedRc.Remark {
+ t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark,
decodedRc.Remark)
+ }
+ if rc.Flag != decodedRc.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag,
decodedRc.Flag)
+ }
+ if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatalf("wrong extFields, want=%v, got=%v",
rc.ExtFields, decodedRc.ExtFields)
}
}
}
@@ -206,16 +230,25 @@ func Test_rmqCodec_decodeHeader(t *testing.T) {
if err != nil {
t.Fatalf("decode header with rmqCodec fail: %v", err)
}
-
- if rc.Code != decodedRc.Code ||
- rc.Language != decodedRc.Language ||
- rc.Version != decodedRc.Version ||
- rc.Opaque != rc.Opaque ||
- rc.Flag != rc.Flag ||
- rc.Remark != rc.Remark ||
- !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
- t.Fatal("decoded RemotingCommand not equal to the
original one")
+ if rc.Code != decodedRc.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", rc.Code,
decodedRc.Code)
+ }
+ if rc.Version != decodedRc.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", rc.Version,
decodedRc.Version)
+ }
+ if rc.Opaque != decodedRc.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque,
decodedRc.Opaque)
+ }
+ if rc.Remark != decodedRc.Remark {
+ t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark,
decodedRc.Remark)
+ }
+ if rc.Flag != decodedRc.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", rc.Flag,
decodedRc.Flag)
}
+ if !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatalf("wrong extFields, want=%v, got=%v",
rc.ExtFields, decodedRc.ExtFields)
+ }
+
}
}
@@ -246,6 +279,7 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
t.Errorf("failed to encode remotingCommand, result is
empty.")
}
}
+ cmdData = cmdData[4:]
newCmd, err := decode(cmdData)
if err != nil {
t.Errorf("failed to decode remoting in JSON. %s", err)
@@ -253,9 +287,6 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
if newCmd.Code != cmd.Code {
t.Errorf("wrong command code. want=%d, got=%d", cmd.Code,
newCmd.Code)
}
- if newCmd.Language != cmd.Language {
- t.Errorf("wrong command language. want=%d, got=%d",
cmd.Language, newCmd.Language)
- }
if newCmd.Version != cmd.Version {
t.Errorf("wrong command version. want=%d, got=%d", cmd.Version,
newCmd.Version)
}
@@ -282,6 +313,7 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
t.Errorf("failed to encode remotingCommand, result is
empty.")
}
}
+ cmdData = cmdData[4:]
newCmd, err := decode(cmdData)
if err != nil {
t.Errorf("failed to decode remoting in JSON. %s", err)
diff --git a/remote/remote_client.go b/remote/remote_client.go
index 93f81ce..40c10f0 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -72,20 +72,27 @@ func (r *ResponseFuture) isTimeout() bool {
}
func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+ var (
+ cmd *RemotingCommand
+ err error
+ )
+ timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
for {
select {
case <-r.Done:
- if r.Err != nil {
- return nil, r.Err
- }
- return r.ResponseCommand, nil
- case <-time.After(r.TimeoutMillis * time.Millisecond):
- return nil, ErrRequestTimeout
+ cmd, err = r.ResponseCommand, r.Err
+ goto done
+ case <-timer.C:
+ err = ErrRequestTimeout
+ goto done
}
}
+done:
+ timer.Stop()
+ return cmd, err
}
-type ClientRequestFunc func(remotingCommand *RemotingCommand) (responseCommand
*RemotingCommand)
+type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
type TcpOption struct {
// TODO
@@ -116,6 +123,7 @@ func (c *RemotingClient) InvokeSync(addr string, request
*RemotingCommand, timeo
resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
c.responseTable.Store(resp.Opaque, resp)
err = c.sendRequest(conn, request)
+ defer c.responseTable.Delete(request.Opaque)
if err != nil {
return nil, err
}
@@ -209,7 +217,7 @@ func (c *RemotingClient) receiveResponse(r net.Conn) {
go func() { // 单个goroutine会造成死锁
res := f(cmd)
if res != nil {
- err := c.sendRequest(r, cmd)
+ err := c.sendRequest(r, res)
if err != nil {
rlog.Warnf("send
response to broker error: %s, type is: %d", err, res.Code)
}
diff --git a/remote/remote_client_test.go b/remote/remote_client_test.go
index acd0d26..c999c0e 100644
--- a/remote/remote_client_test.go
+++ b/remote/remote_client_test.go
@@ -42,7 +42,7 @@ func TestNewResponseFuture(t *testing.T) {
future.TimeoutMillis, time.Duration(1000))
}
if future.callback != nil {
- t.Errorf("wrong ResponseFuture's callback. want=<nil>, got=%v",
future.callback)
+ t.Errorf("wrong ResponseFuture's callback. want=<nil>,
got!=<nil>")
}
if future.Done == nil {
t.Errorf("wrong ResponseFuture's Done. want=<channel>,
got=<nil>")
@@ -129,15 +129,28 @@ func TestCreateScanner(t *testing.T) {
if err != nil {
t.Fatalf("failed to encode RemotingCommand. %s", err)
}
+ client := NewRemotingClient()
reader := bytes.NewReader(content)
- scanner := createScanner(reader)
+ scanner := client.createScanner(reader)
for scanner.Scan() {
rcr, err := decode(scanner.Bytes())
if err != nil {
t.Fatalf("failedd to decode RemotingCommand from
scanner")
}
- if !reflect.DeepEqual(*r, *rcr) {
- t.Fatal("decoded RemotingCommand not equal to the
original one")
+ if r.Code != rcr.Code {
+ t.Fatalf("wrong Code. want=%d, got=%d", r.Code,
rcr.Code)
+ }
+ if r.Version != rcr.Version {
+ t.Fatalf("wrong Version. want=%d, got=%d", r.Version,
rcr.Version)
+ }
+ if r.Opaque != rcr.Opaque {
+ t.Fatalf("wrong Opaque. want=%d, got=%d", r.Opaque,
rcr.Opaque)
+ }
+ if r.Flag != rcr.Flag {
+ t.Fatalf("wrong flag. want=%d, got=%d", r.Opaque,
rcr.Opaque)
+ }
+ if !reflect.DeepEqual(r.ExtFields, rcr.ExtFields) {
+ t.Fatalf("wrong extFields. want=%v, got=%v",
r.ExtFields, rcr.ExtFields)
}
}
}
@@ -149,8 +162,9 @@ func TestInvokeSync(t *testing.T) {
serverSendRemotingCommand.Flag = ResponseType
var wg sync.WaitGroup
wg.Add(1)
+ client := NewRemotingClient()
go func() {
- receiveCommand, err := InvokeSync(":3000",
+ receiveCommand, err := client.InvokeSync(":3000",
clientSendRemtingCommand, time.Duration(1000))
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
@@ -173,7 +187,7 @@ func TestInvokeSync(t *testing.T) {
return
}
defer conn.Close()
- scanner := createScanner(conn)
+ scanner := client.createScanner(conn)
for scanner.Scan() {
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
@@ -205,9 +219,13 @@ func TestInvokeAsync(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
+ client := NewRemotingClient()
go func() {
- err := InvokeAsync(":3000", clientSendRemtingCommand,
+ time.Sleep(1 * time.Second)
+ t.Logf("invoke async method")
+ err := client.InvokeAsync(":3000", clientSendRemtingCommand,
time.Duration(1000), func(r *ResponseFuture) {
+ t.Logf("invoke async callback")
if string(r.ResponseCommand.Body) != "Welcome
native" {
t.Errorf("wrong responseCommand.Body.
want=%s, got=%s",
"Welcome native",
string(r.ResponseCommand.Body))
@@ -231,8 +249,9 @@ func TestInvokeAsync(t *testing.T) {
return
}
defer conn.Close()
- scanner := createScanner(conn)
+ scanner := client.createScanner(conn)
for scanner.Scan() {
+ t.Logf("receive request.")
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
t.Errorf("failed to decode RemotingCommnad.
%s", err)
@@ -241,18 +260,22 @@ func TestInvokeAsync(t *testing.T) {
t.Errorf("wrong code. want=%d, got=%d",
receivedRemotingCommand.Code,
clientSendRemtingCommand.Code)
}
+ t.Logf("encoding response")
body, err := encode(serverSendRemotingCommand)
if err != nil {
t.Fatalf("failed to encode RemotingCommand")
}
_, err = conn.Write(body)
+ t.Logf("sent response")
if err != nil {
t.Fatalf("failed to write body to conneciton.")
}
- return
+ goto done
}
}
- wg.Done()
+done:
+
+ wg.Wait()
}
func TestInvokeOneWay(t *testing.T) {
@@ -260,8 +283,9 @@ func TestInvokeOneWay(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
+ client := NewRemotingClient()
go func() {
- err := InvokeOneWay(":3000", clientSendRemtingCommand,
3*time.Second)
+ err := client.InvokeOneWay(":3000", clientSendRemtingCommand,
3*time.Second)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
@@ -279,7 +303,7 @@ func TestInvokeOneWay(t *testing.T) {
return
}
defer conn.Close()
- scanner := createScanner(conn)
+ scanner := client.createScanner(conn)
for scanner.Scan() {
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {