This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 3a34ed8  complete remote/client.go interface  and unit test (#38)
3a34ed8 is described below

commit 3a34ed85cad756b2c538f37e8654435235f29202
Author: 高峰 <[email protected]>
AuthorDate: Mon Mar 11 21:20:58 2019 +0800

    complete remote/client.go interface  and unit test (#38)
---
 common/init.go                  |  26 +++
 common/manager.go               |   4 +-
 common/route.go                 |   2 +-
 remote/client.go                | 391 ++++++++++++++++++++++------------------
 remote/client_test.go           | 162 +++++++++++++++++
 remote/codec_test.go            |  61 ++++---
 remote/{request.go => codes.go} |  36 ++++
 remote/response.go              |  66 -------
 8 files changed, 474 insertions(+), 274 deletions(-)

diff --git a/common/init.go b/common/init.go
new file mode 100644
index 0000000..1285295
--- /dev/null
+++ b/common/init.go
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package common
+
+import "github.com/apache/rocketmq-client-go/remote"
+
+var client remote.RemotingClient
+
+
+func init(){
+       client = remote.NewDefaultRemotingClient()
+}
\ No newline at end of file
diff --git a/common/manager.go b/common/manager.go
index 63c1896..83e2934 100644
--- a/common/manager.go
+++ b/common/manager.go
@@ -51,7 +51,7 @@ type InnerConsumer interface {
 func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(SendBatchMessage, request, 
encodeMessages(msgs))
-       response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+       response, err := client.InvokeSync(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
                return nil, err
        }
@@ -68,7 +68,7 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, reque
 func SendMessageOneWay(ctx context.Context, brokerAddrs string, request 
*SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(SendBatchMessage, request, 
encodeMessages(msgs))
-       err := remote.InvokeOneWay(brokerAddrs, cmd)
+       err := client.InvokeOneWay(brokerAddrs, cmd)
        return nil, err
 }
 
diff --git a/common/route.go b/common/route.go
index 33f25e1..4ad3218 100644
--- a/common/route.go
+++ b/common/route.go
@@ -157,7 +157,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout 
time.Duration) (*topicR
        }
        rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
 
-       response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
+       response, err := client.InvokeSync(getNameServerAddress(), rc, timeout)
 
        if err != nil {
                return nil, err
diff --git a/remote/client.go b/remote/client.go
index 2c6805c..7b1c528 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -17,244 +17,275 @@
 package remote
 
 import (
+       "bufio"
+       "bytes"
+       "context"
        "encoding/binary"
        "errors"
+       "io"
        "net"
        "sync"
        "time"
-
-       "github.com/apache/rocketmq-client-go/utils"
-       log "github.com/sirupsen/logrus"
 )
 
 var (
+       //ErrRequestTimeout for request timeout error
        ErrRequestTimeout = errors.New("request timeout")
 )
 
-func InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) 
(*RemotingCommand, error) {
-       return nil, nil
+//ResponseFuture for
+type ResponseFuture struct {
+       ResponseCommand *RemotingCommand
+       SendRequestOK   bool
+       Err             error
+       Opaque          int32
+       TimeoutMillis   time.Duration
+       callback        func(*ResponseFuture)
+       BeginTimestamp  int64
+       Done            chan bool
+       callbackOnce    sync.Once
 }
 
-func InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, 
f func(*RemotingCommand)) error {
-       return nil
+//NewResponseFuture create ResponseFuture with opaque, timeout and callback
+func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback 
func(*ResponseFuture)) *ResponseFuture {
+       return &ResponseFuture{
+               Opaque:         opaque,
+               Done:           make(chan bool),
+               TimeoutMillis:  timeoutMillis,
+               callback:       callback,
+               BeginTimestamp: time.Now().Unix() * 1000,
+       }
 }
 
-func InvokeOneWay(addr string, request *RemotingCommand) error {
-       return nil
+func (r *ResponseFuture) executeInvokeCallback() {
+       r.callbackOnce.Do(func() {
+               if r.callback != nil {
+                       r.callback(r)
+               }
+       })
 }
 
-// ClientConfig common config
-type ClientConfig struct {
-       // NameServer or Broker address
-       RemotingAddress string
-
-       ClientIP     string
-       InstanceName string
-
-       // Heartbeat interval in microseconds with message broker, default is 30
-       HeartbeatBrokerInterval time.Duration
-
-       // request timeout time
-       RequestTimeout time.Duration
-       CType          byte
+func (r *ResponseFuture) isTimeout() bool {
+       diff := time.Now().Unix()*1000 - r.BeginTimestamp
+       return diff > int64(r.TimeoutMillis)
+}
 
-       UnitMode          bool
-       UnitName          string
-       VipChannelEnabled bool
+func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+       for {
+               select {
+               case <-r.Done:
+                       if r.Err != nil {
+                               return nil, r.Err
+                       }
+                       return r.ResponseCommand, nil
+               case <-time.After(r.TimeoutMillis * time.Millisecond):
+                       return nil, ErrRequestTimeout
+               }
+       }
 }
 
-type defaultClient struct {
-       //clientId     string
-       config ClientConfig
-       conn   net.Conn
-       // requestId
-       opaque int32
+//RemotingClient includes basic operations for remote
+type RemotingClient interface {
+       Start()
+       Shutdown()
+       InvokeSync(string, *RemotingCommand, time.Duration) (*RemotingCommand, 
error)
+       InvokeAsync(string, *RemotingCommand, time.Duration, 
func(*ResponseFuture)) error
+       InvokeOneWay(string, *RemotingCommand) error
+}
 
-       // int32 -> ResponseFuture
-       responseTable sync.Map
-       codec         serializer
-       exitCh        chan interface{}
+//defaultRemotingClient for default RemotingClient implementation
+type defaultRemotingClient struct {
+       responseTable    map[int32]*ResponseFuture
+       responseLock     sync.RWMutex
+       connectionsTable map[string]net.Conn
+       connectionLock   sync.RWMutex
+       ctx              context.Context
+       cancel           context.CancelFunc
 }
 
-//func newRemotingClient(config ClientConfig) error {
-//     client := &defaultClient{
-//             config: config,
-//     }
-//
-//     switch config.CType {
-//     case Json:
-//             client.codec = &jsonCodec{}
-//     case RocketMQ:
-//             client.codec = &rmqCodec{}
-//     default:
-//             return errors.New("unknow codec")
-//     }
-//
-//     conn, err := net.Dial("tcp", config.RemotingAddress)
-//     if err != nil {
-//             log.Error(err)
-//             return nil, err
-//     }
-//     client.conn = conn
-//     go client.listen()
-//     go client.clearExpiredRequest()
-//     return client, nil
-//}
+//NewDefaultRemotingClient for
+func NewDefaultRemotingClient() RemotingClient {
+       client := &defaultRemotingClient{
+               responseTable:    make(map[int32]*ResponseFuture, 0),
+               connectionsTable: make(map[string]net.Conn, 0),
+       }
+       ctx, cancel := context.WithCancel(context.Background())
+       client.ctx = ctx
+       client.cancel = cancel
+       return client
+}
 
-func (client *defaultClient) invokeSync(request *RemotingCommand) 
(*RemotingCommand, error) {
+//Start begin sca
+func (client *defaultRemotingClient) Start() {
+       ticker := time.NewTicker(1 * time.Second)
+       go func() {
+               for {
+                       select {
+                       case <-ticker.C:
+                               client.scanResponseTable()
+                       case <-client.ctx.Done():
+                               ticker.Stop()
+                               return
+                       }
+               }
+       }()
+}
 
-       response := &ResponseFuture{
-               SendRequestOK:  false,
-               Opaque:         request.Opaque,
-               TimeoutMillis:  client.config.RequestTimeout,
-               BeginTimestamp: time.Now().Unix(),
-               Done:           make(chan bool),
+// Shutdown for call client.cancel
+func (client *defaultRemotingClient) Shutdown() {
+       client.cancel()
+       client.connectionLock.Lock()
+       for addr, conn := range client.connectionsTable {
+               conn.Close()
+               delete(client.connectionsTable, addr)
        }
-       header, err := encode(request)
-       body := request.Body
-       client.responseTable.Store(request.Opaque, response)
-       err = client.doRequest(header, body)
+       client.connectionLock.Unlock()
+}
 
+// InvokeSync sends request synchronously
+func (client *defaultRemotingClient) InvokeSync(addr string, request 
*RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
+       conn, err := client.connect(addr)
        if err != nil {
-               log.Error(err)
                return nil, err
        }
-       select {
-       case <-response.Done:
-               rmd := response.ResponseCommand
-               return rmd, nil
-       case <-time.After(client.config.RequestTimeout):
-               return nil, ErrRequestTimeout
+       resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
+       client.responseLock.Lock()
+       client.responseTable[resp.Opaque] = resp
+       client.responseLock.Unlock()
+       err = client.sendRequest(conn, request)
+       if err != nil {
+               return nil, err
        }
+       resp.SendRequestOK = true
+       return resp.waitResponse()
 }
 
-func (client *defaultClient) invokeAsync(request *RemotingCommand, f 
func(*RemotingCommand)) error {
-
-       response := &ResponseFuture{
-               SendRequestOK:  false,
-               Opaque:         request.Opaque,
-               TimeoutMillis:  client.config.RequestTimeout,
-               BeginTimestamp: time.Now().Unix(),
-               callback:       f,
+//InvokeAsync send request asynchronously
+func (client *defaultRemotingClient) InvokeAsync(addr string, request 
*RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) 
error {
+       conn, err := client.connect(addr)
+       if err != nil {
+               return err
        }
-       client.responseTable.Store(request.Opaque, response)
-       header, err := encode(request)
+       resp := NewResponseFuture(request.Opaque, timeoutMillis, callback)
+       client.responseLock.Lock()
+       client.responseTable[resp.Opaque] = resp
+       client.responseLock.Unlock()
+       err = client.sendRequest(conn, request)
        if err != nil {
                return err
        }
-
-       body := request.Body
-       return client.doRequest(header, body)
+       resp.SendRequestOK = true
+       return nil
 }
 
-func (client *defaultClient) invokeOneWay(request *RemotingCommand) error {
-       header, err := encode(request)
+//InvokeOneWay send one-way request
+func (client *defaultRemotingClient) InvokeOneWay(addr string, request 
*RemotingCommand) error {
+       conn, err := client.connect(addr)
        if err != nil {
                return err
        }
-
-       body := request.Body
-       return client.doRequest(header, body)
+       return client.sendRequest(conn, request)
 }
 
-func (client *defaultClient) doRequest(header, body []byte) error {
-       var requestBytes = make([]byte, len(header)+len(body))
-       copy(requestBytes, header)
-       if len(body) > 0 {
-               copy(requestBytes[len(header):], body)
+func (client *defaultRemotingClient) scanResponseTable() {
+       rfs := make([]*ResponseFuture, 0)
+       client.responseLock.Lock()
+       for opaque, resp := range client.responseTable {
+               if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= 
time.Now().Unix()*1000 {
+                       delete(client.responseTable, opaque)
+                       rfs = append(rfs, resp)
+               }
+       }
+       client.responseLock.Unlock()
+       for _, rf := range rfs {
+               rf.Err = ErrRequestTimeout
+               rf.executeInvokeCallback()
        }
-
-       _, err := client.conn.Write(requestBytes)
-       return err
 }
 
-func (client *defaultClient) close() {
-       // TODO process response
-       client.conn.Close()
+func (client *defaultRemotingClient) connect(addr string) (net.Conn, error) {
+       client.connectionLock.Lock()
+       defer client.connectionLock.Unlock()
+       conn, ok := client.connectionsTable[addr]
+       if ok {
+               return conn.(net.Conn), nil
+       }
+       tcpConn, err := net.Dial("tcp", addr)
+       if err != nil {
+               return nil, err
+       }
+       client.connectionsTable[addr] = tcpConn
+       go client.receiveResponse(tcpConn)
+       return tcpConn, nil
 }
 
-func (client *defaultClient) listen() {
-       rb := utils.NewRingBuffer(4096)
-
-       var frameSize int32
-       go func() {
-               for {
-                       err := binary.Read(rb, binary.BigEndian, &frameSize)
-                       if err != nil {
-                               // TODO
-                       }
-                       data := make([]byte, frameSize)
-
-                       _, err = rb.Read(data)
-
-                       if err != nil {
-                               // TODO
-                       }
-
-                       cmd, err := decode(data)
-                       if cmd.isResponseType() {
-                               client.handleResponse(cmd)
-                       } else {
-                               client.handleRequestFromServer(cmd)
-                       }
-               }
-       }()
-
-       buf := make([]byte, 4096)
-       for {
-               n, err := client.conn.Read(buf)
+func (client *defaultRemotingClient) receiveResponse(conn net.Conn) {
+       scanner := createScanner(conn)
+       for scanner.Scan() {
+               receivedRemotingCommand, err := decode(scanner.Bytes())
                if err != nil {
-                       log.Errorf("read data from connection errors: %v", err)
-                       return
+                       client.closeConnection(conn)
+                       break
                }
-               err = rb.Write(buf[:n])
-               if err != nil {
-                       // just log
-                       log.Errorf("write data to buffer errors: %v", err)
+               if receivedRemotingCommand.isResponseType() {
+                       client.responseLock.Lock()
+                       if resp, ok := 
client.responseTable[receivedRemotingCommand.Opaque]; ok {
+                               delete(client.responseTable, 
receivedRemotingCommand.Opaque)
+                               resp.ResponseCommand = receivedRemotingCommand
+                               resp.executeInvokeCallback()
+                               if resp.Done != nil {
+                                       resp.Done <- true
+                               }
+                       }
+                       client.responseLock.Unlock()
+               } else {
+                       // todo handler request from peer
                }
-
        }
 }
 
-func (client *defaultClient) handleRequestFromServer(cmd *RemotingCommand) {
-       //responseCommand := client.clientRequestProcessor(cmd)
-       //if responseCommand == nil {
-       //      return
-       //}
-       //responseCommand.Opaque = cmd.Opaque
-       //responseCommand.markResponseType()
-       //header, err := encode(responseCommand)
-       //body := responseCommand.Body
-       //err = client.doRequest(header, body)
-       //if err != nil {
-       //      log.Error(err)
-       //}
+func createScanner(r io.Reader) *bufio.Scanner {
+       scanner := bufio.NewScanner(r)
+       scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
+               if !atEOF {
+                       if len(data) >= 4 {
+                               var length int32
+                               binary.Read(bytes.NewReader(data[0:4]), 
binary.BigEndian, &length)
+                               if int(length)+4 <= len(data) {
+                                       return int(length) + 4, 
data[:int(length)+4], nil
+                               }
+                       }
+               }
+               return 0, nil, nil
+       })
+       return scanner
 }
 
-func (client *defaultClient) handleResponse(cmd *RemotingCommand) error {
-       //response, err := client.getResponse(cmd.Opaque)
-       ////client.removeResponse(cmd.Opaque)
-       //if err != nil {
-       //      return err
-       //}
-       //
-       //response.ResponseCommand = cmd
-       //response.callback(cmd)
-       //
-       //if response.Done != nil {
-       //      response.Done <- true
-       //}
+func (client *defaultRemotingClient) sendRequest(conn net.Conn, request 
*RemotingCommand) error {
+       content, err := encode(request)
+       if err != nil {
+               return err
+       }
+       _, err = conn.Write(content)
+       if err != nil {
+               client.closeConnection(conn)
+               return err
+       }
        return nil
 }
 
-func (client *defaultClient) clearExpiredRequest() {
-       //for seq, responseObj := range client.responseTable.Items() {
-       //      response := responseObj.(*ResponseFuture)
-       //      if (response.BeginTimestamp + 30) <= time.Now().Unix() {
-       //              //30 minutes expired
-       //              client.responseTable.Remove(seq)
-       //              response.callback(nil)
-       //              log.Warningf("remove time out request %v", response)
-       //      }
-       //}
+func (client *defaultRemotingClient) closeConnection(toCloseConn net.Conn) {
+       client.connectionLock.Lock()
+       var toCloseAddr string
+       for addr, con := range client.connectionsTable {
+               if con == toCloseConn {
+                       toCloseAddr = addr
+                       break
+               }
+       }
+       if conn, ok := client.connectionsTable[toCloseAddr]; ok {
+               delete(client.connectionsTable, toCloseAddr)
+               conn.Close()
+       }
+       client.connectionLock.Unlock()
 }
diff --git a/remote/client_test.go b/remote/client_test.go
index 7909612..55a26fc 100644
--- a/remote/client_test.go
+++ b/remote/client_test.go
@@ -15,3 +15,165 @@
  *  limitations under the License.
  */
 package remote
+
+import (
+       "bytes"
+       "errors"
+       "reflect"
+       "sync"
+       "testing"
+       "time"
+)
+
+func TestNewResponseFuture(t *testing.T) {
+       future := NewResponseFuture(10, time.Duration(1000), nil)
+       if future.Opaque != 10 {
+               t.Errorf("wrong ResponseFuture's Opaque. want=%d, got=%d", 10, 
future.Opaque)
+       }
+       if future.SendRequestOK != false {
+               t.Errorf("wrong ResposneFutrue's SendRequestOK. want=%t, 
got=%t", false, future.SendRequestOK)
+       }
+       if future.Err != nil {
+               t.Errorf("wrong RespnseFuture's Err. want=<nil>, got=%v", 
future.Err)
+       }
+       if future.TimeoutMillis != time.Duration(1000) {
+               t.Errorf("wrong ResponseFuture's TimeoutMills. want=%d, got=%d",
+                       future.TimeoutMillis, time.Duration(1000))
+       }
+       if future.callback != nil {
+               t.Errorf("wrong ResponseFuture's callback. want=<nil>, got=%v", 
future.callback)
+       }
+       if future.Done == nil {
+               t.Errorf("wrong ResponseFuture's Done. want=<channel>, 
got=<nil>")
+       }
+}
+
+func TestResponseFutureTimeout(t *testing.T) {
+       callback := func(r *ResponseFuture) {
+               if r.ResponseCommand.Remark == "" {
+                       r.ResponseCommand.Remark = "Hello RocketMQ."
+               } else {
+                       r.ResponseCommand.Remark = r.ResponseCommand.Remark + 
"Go Client"
+               }
+       }
+       future := NewResponseFuture(10, time.Duration(1000), callback)
+       future.ResponseCommand = NewRemotingCommand(200,
+               nil, nil)
+
+       var wg sync.WaitGroup
+       wg.Add(10)
+       for i := 0; i < 10; i++ {
+               go func() {
+                       future.executeInvokeCallback()
+                       wg.Done()
+               }()
+       }
+       wg.Wait()
+       if future.ResponseCommand.Remark != "Hello RocketMQ." {
+               t.Errorf("wrong ResponseFuture.ResponseCommand.Remark. want=%s, 
got=%s",
+                       "Hello RocketMQ.", future.ResponseCommand.Remark)
+       }
+
+}
+
+func TestResponseFutureIsTimeout(t *testing.T) {
+       future := NewResponseFuture(10, time.Duration(500), nil)
+       if future.isTimeout() != false {
+               t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", 
false, future.isTimeout())
+       }
+       time.Sleep(time.Duration(2000) * time.Millisecond)
+       if future.isTimeout() != true {
+               t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", 
true, future.isTimeout())
+       }
+
+}
+
+func TestResponseFutureWaitResponse(t *testing.T) {
+       future := NewResponseFuture(10, time.Duration(500), nil)
+       if _, err := future.waitResponse(); err != ErrRequestTimeout {
+               t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
+                       ErrRequestTimeout, err)
+       }
+       future = NewResponseFuture(10, time.Duration(500), nil)
+       responseError := errors.New("response error")
+       go func() {
+               time.Sleep(100 * time.Millisecond)
+               future.Err = responseError
+               future.Done <- true
+       }()
+       if _, err := future.waitResponse(); err != responseError {
+               t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
+                       responseError, err)
+       }
+       future = NewResponseFuture(10, time.Duration(500), nil)
+       responseRemotingCommand := NewRemotingCommand(202, nil, nil)
+       go func() {
+               time.Sleep(100 * time.Millisecond)
+               future.ResponseCommand = responseRemotingCommand
+               future.Done <- true
+       }()
+       if r, err := future.waitResponse(); err != nil {
+               t.Errorf("wrong ResponseFuture waitResponse error: %v", err)
+       } else {
+               if r != responseRemotingCommand {
+                       t.Errorf("wrong ResponseFuture waitResposne result. 
want=%v, got=%v",
+                               responseRemotingCommand, r)
+               }
+       }
+}
+
+func TestNewDefaultRemotingClient(t *testing.T) {
+       r := NewDefaultRemotingClient()
+       d, ok := r.(*defaultRemotingClient)
+       if !ok {
+               t.Errorf("defaultRemotingClient does not implement 
RemotingClient interface")
+       }
+       if len(d.responseTable) != 0 {
+               t.Errorf("wrong responseTable size. want=%d, got=%d",
+                       0, len(d.responseTable))
+       }
+       if len(d.connectionsTable) != 0 {
+               t.Errorf("wrong connectionsTable size. want=%d, got=%d",
+                       0, len(d.connectionsTable))
+       }
+       if d.ctx == nil {
+               t.Errorf("wrong ctx. want=%v, got=<nil>", d.ctx)
+       }
+       if d.cancel == nil {
+               t.Errorf("wrong cancel. want=%v, got=<nil>", d.cancel)
+       }
+}
+
+func TestDefaultRemotingClient_Start_ShutDown(t *testing.T) {
+       r := NewDefaultRemotingClient()
+       d, ok := r.(*defaultRemotingClient)
+       if !ok {
+               t.Errorf("defaultRemotingClient does not implement 
RemotingClient interface")
+       }
+       d.Start()
+       time.Sleep(2 * time.Second)
+       d.Shutdown()
+       if len(d.connectionsTable) != 0 {
+               t.Errorf("wrong connectionTable. want=%d, got=%d",
+                       0, len(d.connectionsTable))
+       }
+}
+
+func TestCreateScanner(t *testing.T) {
+       r := randomNewRemotingCommand()
+       content, err := encode(r)
+       if err != nil {
+               t.Fatalf("failed to encode RemotingCommand. %s", err)
+       }
+       reader := bytes.NewReader(content)
+       scanner := createScanner(reader)
+       for scanner.Scan() {
+               rcr, err := decode(scanner.Bytes())
+               if err != nil {
+                       t.Fatalf("failedd to decode RemotingCommand from 
scanner")
+               }
+               if !reflect.DeepEqual(*r, *rcr) {
+                       t.Fatal("decoded RemotingCommand not equal to the 
original one")
+               }
+       }
+}
\ No newline at end of file
diff --git a/remote/codec_test.go b/remote/codec_test.go
index 62dd528..5e63e4a 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -22,6 +22,18 @@ import (
        "testing"
 )
 
+type testHeader struct {
+
+}
+
+func (t testHeader) Encode() map[string]string {
+       properties := make(map[string]string)
+       for i := 0; i < 10; i++ {
+               properties[randomString(rand.Intn(20))] = 
randomString(rand.Intn(20))
+       }
+       return properties
+}
+
 func randomBytes(length int) []byte {
        bs := make([]byte, length)
        if _, err := rand.Read(bs); err != nil {
@@ -39,12 +51,9 @@ func randomString(length int) string {
 }
 
 func randomNewRemotingCommand() *RemotingCommand {
-       properties := make(map[string]string)
-       for i := 0; i < 10; i++ {
-               properties[randomString(rand.Intn(20))] = 
randomString(rand.Intn(20))
-       }
+       var h testHeader
        body := randomBytes(rand.Intn(100))
-       return NewRemotingCommand(int16(rand.Intn(1000)), properties, body)
+       return NewRemotingCommand(int16(rand.Intn(1000)), h, body)
 }
 
 func Test_encode(t *testing.T) {
@@ -227,7 +236,8 @@ func Benchmark_rmqCodec_decodeHeader(b *testing.B) {
 }
 
 func TestCommandJsonEncodeDecode(t *testing.T) {
-       cmd := NewRemotingCommand(192, map[string]string{"brokers": 
"127.0.0.1"}, []byte("Hello RocketMQCodecs"))
+       var h testHeader
+       cmd := NewRemotingCommand(192, h, []byte("Hello RocketMQCodecs"))
        codecType = JsonCodecs
        cmdData, err := encode(cmd)
        if err != nil {
@@ -259,19 +269,20 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
        if newCmd.Remark != cmd.Remark {
                t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, 
newCmd.Remark)
        }
-       for k, v := range cmd.ExtFields {
-               if vv, ok := newCmd.ExtFields[k]; !ok {
-                       t.Errorf("key: %s not exists in newCommand.", k)
-               } else {
-                       if v != vv {
-                               t.Errorf("wrong value. want=%s, got=%s", v, vv)
-                       }
-               }
-       }
+       //for k, v := range cmd.ExtFields {
+       //      if vv, ok := newCmd.ExtFields[k]; !ok {
+       //              t.Errorf("key: %s not exists in newCommand.", k)
+       //      } else {
+       //              if v != vv {
+       //                      t.Errorf("wrong value. want=%s, got=%s", v, vv)
+       //              }
+       //      }
+       //}
 }
 
 func TestCommandRocketMQEncodeDecode(t *testing.T) {
-       cmd := NewRemotingCommand(192, map[string]string{"brokers": 
"127.0.0.1"}, []byte("Hello RocketMQCodecs"))
+       var h testHeader
+       cmd := NewRemotingCommand(192, h, []byte("Hello RocketMQCodecs"))
        codecType = RocketMQCodecs
        cmdData, err := encode(cmd)
        if err != nil {
@@ -303,13 +314,13 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
        if newCmd.Remark != cmd.Remark {
                t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, 
newCmd.Remark)
        }
-       for k, v := range cmd.ExtFields {
-               if vv, ok := newCmd.ExtFields[k]; !ok {
-                       t.Errorf("key: %s not exists in newCommand.", k)
-               } else {
-                       if v != vv {
-                               t.Errorf("wrong value. want=%s, got=%s", v, vv)
-                       }
-               }
-       }
+       //for k, v := range cmd.ExtFields {
+       //      if vv, ok := newCmd.ExtFields[k]; !ok {
+       //              t.Errorf("key: %s not exists in newCommand.", k)
+       //      } else {
+       //              if v != vv {
+       //                      t.Errorf("wrong value. want=%s, got=%s", v, vv)
+       //              }
+       //      }
+       //}
 }
diff --git a/remote/request.go b/remote/codes.go
similarity index 78%
rename from remote/request.go
rename to remote/codes.go
index c37eac5..b94d2e2 100644
--- a/remote/request.go
+++ b/remote/codes.go
@@ -109,3 +109,39 @@ const (
 
        VIEW_BROKER_STATS_DATA = 315
 )
+
+const (
+       SUCCESS                       = 0
+       SYSTEM_ERROR                  = 1
+       SYSTEM_BUSY                   = 2
+       REQUEST_CODE_NOT_SUPPORTED    = 3
+       TRANSACTION_FAILED            = 4
+       FLUSH_DISK_TIMEOUT            = 10
+       SLAVE_NOT_AVAILABLE           = 11
+       FLUSH_SLAVE_TIMEOUT           = 12
+       MESSAGE_ILLEGAL               = 13
+       SERVICE_NOT_AVAILABLE         = 14
+       VERSION_NOT_SUPPORTED         = 15
+       NO_PERMISSION                 = 16
+       TOPIC_NOT_EXIST               = 17
+       TOPIC_EXIST_ALREADY           = 18
+       PULL_NOT_FOUND                = 19
+       PULL_RETRY_IMMEDIATELY        = 20
+       PULL_OFFSET_MOVED             = 21
+       QUERY_NOT_FOUND               = 22
+       SUBSCRIPTION_PARSE_FAILED     = 23
+       SUBSCRIPTION_NOT_EXIST        = 24
+       SUBSCRIPTION_NOT_LATEST       = 25
+       SUBSCRIPTION_GROUP_NOT_EXIST  = 26
+       TRANSACTION_SHOULD_COMMIT     = 200
+       TRANSACTION_SHOULD_ROLLBACK   = 201
+       TRANSACTION_STATE_UNKNOW      = 202
+       TRANSACTION_STATE_GROUP_WRONG = 203
+       NO_BUYER_ID                   = 204
+
+       NOT_IN_CURRENT_UNIT = 205
+
+       CONSUMER_NOT_ONLINE = 206
+
+       CONSUME_MSG_TIMEOUT = 207
+)
diff --git a/remote/response.go b/remote/response.go
deleted file mode 100644
index d7954bc..0000000
--- a/remote/response.go
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package remote
-
-import "time"
-
-const (
-       SUCCESS                       = 0
-       SYSTEM_ERROR                  = 1
-       SYSTEM_BUSY                   = 2
-       REQUEST_CODE_NOT_SUPPORTED    = 3
-       TRANSACTION_FAILED            = 4
-       FLUSH_DISK_TIMEOUT            = 10
-       SLAVE_NOT_AVAILABLE           = 11
-       FLUSH_SLAVE_TIMEOUT           = 12
-       MESSAGE_ILLEGAL               = 13
-       SERVICE_NOT_AVAILABLE         = 14
-       VERSION_NOT_SUPPORTED         = 15
-       NO_PERMISSION                 = 16
-       TOPIC_NOT_EXIST               = 17
-       TOPIC_EXIST_ALREADY           = 18
-       PULL_NOT_FOUND                = 19
-       PULL_RETRY_IMMEDIATELY        = 20
-       PULL_OFFSET_MOVED             = 21
-       QUERY_NOT_FOUND               = 22
-       SUBSCRIPTION_PARSE_FAILED     = 23
-       SUBSCRIPTION_NOT_EXIST        = 24
-       SUBSCRIPTION_NOT_LATEST       = 25
-       SUBSCRIPTION_GROUP_NOT_EXIST  = 26
-       TRANSACTION_SHOULD_COMMIT     = 200
-       TRANSACTION_SHOULD_ROLLBACK   = 201
-       TRANSACTION_STATE_UNKNOW      = 202
-       TRANSACTION_STATE_GROUP_WRONG = 203
-       NO_BUYER_ID                   = 204
-
-       NOT_IN_CURRENT_UNIT = 205
-
-       CONSUMER_NOT_ONLINE = 206
-
-       CONSUME_MSG_TIMEOUT = 207
-)
-
-type ResponseFuture struct {
-       ResponseCommand *RemotingCommand
-       SendRequestOK   bool
-       Rrr             error
-       Opaque          int32
-       TimeoutMillis   time.Duration
-       callback        func(*RemotingCommand)
-       BeginTimestamp  int64
-       Done            chan bool
-}

Reply via email to