This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9736ba8  feat(produce) : reduce memory copy for on-wire protocol
     new 6cd3181  Merge pull request #607 from shenhui0509/master
9736ba8 is described below

commit 9736ba88b63320baebdac0396171bcc7b51c70b8
Author: shenhui.backend <[email protected]>
AuthorDate: Thu Feb 11 11:21:21 2021 +0800

    feat(produce) : reduce memory copy for on-wire protocol
---
 internal/remote/codec.go         | 38 ++++++++++++++++++++++++++++++++++++++
 internal/remote/remote_client.go |  8 +++-----
 internal/remote/tcp_conn.go      |  2 ++
 3 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index f756c11..1c6e0a5 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -20,6 +20,7 @@ import (
        "bytes"
        "encoding/binary"
        "fmt"
+       "io"
        "sync/atomic"
 
        jsoniter "github.com/json-iterator/go"
@@ -132,6 +133,43 @@ var (
 // 
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 // + len  |   4bytes   |     4bytes    | (21 + r_len + e_len) bytes | remain 
bytes +
 // 
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+func (command *RemotingCommand) WriteTo(w io.Writer) error {
+       var (
+               header []byte
+               err    error
+       )
+
+       switch codecType {
+       case JsonCodecs:
+               header, err = jsonSerializer.encodeHeader(command)
+       case RocketMQCodecs:
+               header, err = rocketMqSerializer.encodeHeader(command)
+       }
+
+       if err != nil {
+               return err
+       }
+
+       frameSize := 4 + len(header) + len(command.Body)
+       err = binary.Write(w, binary.BigEndian, int32(frameSize))
+       if err != nil {
+               return err
+       }
+
+       err = binary.Write(w, binary.BigEndian, 
markProtocolType(int32(len(header))))
+       if err != nil {
+               return err
+       }
+
+       _, err = w.Write(header)
+       if err != nil {
+               return err
+       }
+
+       _, err = w.Write(command.Body)
+       return err
+}
+
 func encode(command *RemotingCommand) ([]byte, error) {
        var (
                header []byte
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index dae364e..5e2aa83 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -272,11 +272,9 @@ func (c *remotingClient) sendRequest(conn *tcpConnWrapper, 
request *RemotingComm
 }
 
 func (c *remotingClient) doRequest(conn *tcpConnWrapper, request 
*RemotingCommand) error {
-       content, err := encode(request)
-       if err != nil {
-               return err
-       }
-       _, err = conn.Write(content)
+       conn.Lock()
+       defer conn.Unlock()
+       err := request.WriteTo(conn)
        if err != nil {
                c.closeConnection(conn)
                return err
diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go
index dda7dcf..ae340c6 100644
--- a/internal/remote/tcp_conn.go
+++ b/internal/remote/tcp_conn.go
@@ -19,6 +19,7 @@ package remote
 import (
        "context"
        "net"
+       "sync"
 
        "go.uber.org/atomic"
 )
@@ -26,6 +27,7 @@ import (
 // TODO: Adding TCP Connections Pool, 
https://github.com/apache/rocketmq-client-go/v2/issues/298
 type tcpConnWrapper struct {
        net.Conn
+       sync.Mutex
        closed atomic.Bool
 }
 

Reply via email to