This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ee932fa35b [INLONG-8637][SDK] Pool data request and batch request
(#8638)
ee932fa35b is described below
commit ee932fa35b675bbc8168777416cc31a1721eefa4
Author: gunli <[email protected]>
AuthorDate: Sun Aug 6 15:56:24 2023 +0800
[INLONG-8637][SDK] Pool data request and batch request (#8638)
Co-authored-by: gunli <[email protected]>
---
.../dataproxy-sdk-golang/dataproxy/request.go | 26 ++++++++++++++++++++++
.../dataproxy-sdk-golang/dataproxy/worker.go | 8 +++++--
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index ac89aba142..d93b0af006 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -22,6 +22,7 @@ import (
"encoding/binary"
"strconv"
"strings"
+ "sync"
"time"
"unsafe"
@@ -35,6 +36,8 @@ var (
byteOrder = binary.BigEndian
heartbeatRsp = []byte{0x00, 0x00, 0x00, 0x01, 0x01}
heartbeatRspLen = len(heartbeatRsp)
+ reqPool *sync.Pool
+ batchPool *sync.Pool
)
const (
@@ -42,6 +45,19 @@ const (
msgTypeHeartbeat uint8 = 1
)
+func init() {
+ reqPool = &sync.Pool{
+ New: func() interface{} {
+ return &sendDataReq{}
+ },
+ }
+ batchPool = &sync.Pool{
+ New: func() interface{} {
+ return &batchReq{}
+ },
+ }
+}
+
type heartbeatReq struct {
}
@@ -68,6 +84,7 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte {
type batchCallback func()
type batchReq struct {
+ pool *sync.Pool
workerID string
batchID string
groupID string
@@ -112,6 +129,10 @@ func (b *batchReq) done(err error) {
b.metrics.observeTime(errorCode,
time.Since(b.batchTime).Milliseconds())
b.metrics.observeSize(errorCode, b.dataSize)
}
+
+ if b.pool != nil {
+ b.pool.Put(b)
+ }
}
func (b *batchReq) encode() []byte {
@@ -299,6 +320,7 @@ func (b *batchRsp) decode(input []byte) {
}
type sendDataReq struct {
+ pool *sync.Pool
ctx context.Context
msg Message
callback Callback
@@ -328,6 +350,10 @@ func (s *sendDataReq) done(err error, errCode string) {
s.metrics.incMessage(errCode)
}
+
+ if s.pool != nil {
+ s.pool.Put(s)
+ }
}
type closeReq struct {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 7edfc0c6dc..eb4a8348ac 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -244,7 +244,9 @@ func (w *worker) start() {
}
func (w *worker) doSendAsync(ctx context.Context, msg Message, callback
Callback, flushImmediately bool) {
- req := &sendDataReq{
+ req := reqPool.Get().(*sendDataReq)
+ *req = sendDataReq{
+ pool: reqPool,
ctx: ctx,
msg: msg,
callback: callback,
@@ -319,7 +321,9 @@ func (w *worker) handleSendData(req *sendDataReq) {
batch, ok := w.pendingBatches[req.msg.StreamID]
if !ok {
streamID := req.msg.StreamID
- batch = &batchReq{
+ batch = batchPool.Get().(*batchReq)
+ *batch = batchReq{
+ pool: batchPool,
workerID: w.indexStr,
batchID: util.SnowFlakeID(),
groupID: w.options.GroupID,