This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73e6b2a039 [INLONG-11601][SDK] Fix data race in Golang SDK (#11602)
73e6b2a039 is described below

commit 73e6b2a03994984447df822eb3faed04458ef46b
Author: gunli <[email protected]>
AuthorDate: Mon Dec 16 17:36:42 2024 +0800

    [INLONG-11601][SDK] Fix data race in Golang SDK (#11602)
    
    Co-authored-by: gunli <[email protected]>
---
 .../dataproxy-sdk-golang/dataproxy/request.go                    | 6 ++++--
 .../dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go | 9 ++++++---
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index 03bca16a6e..82100ef25d 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -139,8 +139,9 @@ func (b *batchReq) done(err error) {
        }
 
        if b.pool != nil {
-               b.pool.Put(b)
+               pool := b.pool
                b.pool = nil
+               pool.Put(b)
        }
 }
 
@@ -368,8 +369,9 @@ func (s *sendDataReq) done(err error, errCode string) {
        }
 
        if s.pool != nil {
-               s.pool.Put(s)
+               pool := s.pool
                s.pool = nil
+               pool.Put(s)
        }
 }
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 0352052fd4..efe115d6ee 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -130,7 +130,7 @@ type worker struct {
        metrics            *metrics                 // metrics
        bufferPool         bufferpool.BufferPool    // buffer pool
        bytePool           bufferpool.BytePool      // byte pool
-       stop               bool                     // stop the worker
+       stop               chan struct{}            // stop the worker
 }
 
 func newWorker(cli *client, index int, opts *Options) (*worker, error) {
@@ -162,6 +162,7 @@ func newWorker(cli *client, index int, opts *Options) 
(*worker, error) {
                bufferPool:         opts.BufferPool,
                bytePool:           opts.BytePool,
                log:                opts.Logger,
+               stop:               make(chan struct{}),
        }
 
        // set to init state
@@ -197,8 +198,10 @@ func (w *worker) start() {
                        }
                }()
 
-               for !w.stop {
+               for {
                        select {
+                       case <-w.stop:
+                               return
                        case req, ok := <-w.cmdChan:
                                if !ok {
                                        continue
@@ -640,7 +643,7 @@ func (w *worker) close() {
 
        // wait for the close request done
        <-req.doneCh
-       w.stop = true
+       close(w.stop)
 }
 
 func (w *worker) handleClose(req *closeReq) {

Reply via email to