This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73e6b2a039 [INLONG-11601][SDK] Fix data race in Golang SDK (#11602)
73e6b2a039 is described below
commit 73e6b2a03994984447df822eb3faed04458ef46b
Author: gunli <[email protected]>
AuthorDate: Mon Dec 16 17:36:42 2024 +0800
[INLONG-11601][SDK] Fix data race in Golang SDK (#11602)
Co-authored-by: gunli <[email protected]>
---
.../dataproxy-sdk-golang/dataproxy/request.go | 6 ++++--
.../dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go | 9 ++++++---
2 files changed, 10 insertions(+), 5 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index 03bca16a6e..82100ef25d 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -139,8 +139,9 @@ func (b *batchReq) done(err error) {
}
if b.pool != nil {
- b.pool.Put(b)
+ pool := b.pool
b.pool = nil
+ pool.Put(b)
}
}
@@ -368,8 +369,9 @@ func (s *sendDataReq) done(err error, errCode string) {
}
if s.pool != nil {
- s.pool.Put(s)
+ pool := s.pool
s.pool = nil
+ pool.Put(s)
}
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 0352052fd4..efe115d6ee 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -130,7 +130,7 @@ type worker struct {
metrics *metrics // metrics
bufferPool bufferpool.BufferPool // buffer pool
bytePool bufferpool.BytePool // byte pool
- stop bool // stop the worker
+ stop chan struct{} // stop the worker
}
func newWorker(cli *client, index int, opts *Options) (*worker, error) {
@@ -162,6 +162,7 @@ func newWorker(cli *client, index int, opts *Options)
(*worker, error) {
bufferPool: opts.BufferPool,
bytePool: opts.BytePool,
log: opts.Logger,
+ stop: make(chan struct{}),
}
// set to init state
@@ -197,8 +198,10 @@ func (w *worker) start() {
}
}()
- for !w.stop {
+ for {
select {
+ case <-w.stop:
+ return
case req, ok := <-w.cmdChan:
if !ok {
continue
@@ -640,7 +643,7 @@ func (w *worker) close() {
// wait for the close request done
<-req.doneCh
- w.stop = true
+ close(w.stop)
}
func (w *worker) handleClose(req *closeReq) {