This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9736ba8 feat(produce) : reduce memory copy for on-wire protocol
new 6cd3181 Merge pull request #607 from shenhui0509/master
9736ba8 is described below
commit 9736ba88b63320baebdac0396171bcc7b51c70b8
Author: shenhui.backend <[email protected]>
AuthorDate: Thu Feb 11 11:21:21 2021 +0800
feat(produce) : reduce memory copy for on-wire protocol
---
internal/remote/codec.go | 38 ++++++++++++++++++++++++++++++++++++++
internal/remote/remote_client.go | 8 +++-----
internal/remote/tcp_conn.go | 2 ++
3 files changed, 43 insertions(+), 5 deletions(-)
diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index f756c11..1c6e0a5 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -20,6 +20,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
+ "io"
"sync/atomic"
jsoniter "github.com/json-iterator/go"
@@ -132,6 +133,43 @@ var (
//
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + len | 4bytes | 4bytes | (21 + r_len + e_len) bytes | remain
bytes +
//
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+func (command *RemotingCommand) WriteTo(w io.Writer) error {
+ var (
+ header []byte
+ err error
+ )
+
+ switch codecType {
+ case JsonCodecs:
+ header, err = jsonSerializer.encodeHeader(command)
+ case RocketMQCodecs:
+ header, err = rocketMqSerializer.encodeHeader(command)
+ }
+
+ if err != nil {
+ return err
+ }
+
+ frameSize := 4 + len(header) + len(command.Body)
+ err = binary.Write(w, binary.BigEndian, int32(frameSize))
+ if err != nil {
+ return err
+ }
+
+ err = binary.Write(w, binary.BigEndian,
markProtocolType(int32(len(header))))
+ if err != nil {
+ return err
+ }
+
+ _, err = w.Write(header)
+ if err != nil {
+ return err
+ }
+
+ _, err = w.Write(command.Body)
+ return err
+}
+
func encode(command *RemotingCommand) ([]byte, error) {
var (
header []byte
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index dae364e..5e2aa83 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -272,11 +272,9 @@ func (c *remotingClient) sendRequest(conn *tcpConnWrapper,
request *RemotingComm
}
func (c *remotingClient) doRequest(conn *tcpConnWrapper, request
*RemotingCommand) error {
- content, err := encode(request)
- if err != nil {
- return err
- }
- _, err = conn.Write(content)
+ conn.Lock()
+ defer conn.Unlock()
+ err := request.WriteTo(conn)
if err != nil {
c.closeConnection(conn)
return err
diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go
index dda7dcf..ae340c6 100644
--- a/internal/remote/tcp_conn.go
+++ b/internal/remote/tcp_conn.go
@@ -19,6 +19,7 @@ package remote
import (
"context"
"net"
+ "sync"
"go.uber.org/atomic"
)
@@ -26,6 +27,7 @@ import (
// TODO: Adding TCP Connections Pool,
https://github.com/apache/rocketmq-client-go/v2/issues/298
type tcpConnWrapper struct {
net.Conn
+ sync.Mutex
closed atomic.Bool
}