This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 7448aa1c [Go] Fix data race in receiveMessage for push and simple
consumers (#1275)
7448aa1c is described below
commit 7448aa1cb7ab6cc3f19a65a2942ac28cb871113b
Author: guyinyou <[email protected]>
AuthorDate: Mon Jun 15 11:02:03 2026 +0800
[Go] Fix data race in receiveMessage for push and simple consumers (#1275)
The err and resps variables were shared between the main goroutine
and a spawned goroutine without synchronization, causing a data race
detectable by Go's race detector. The goroutine wrote to both
variables while the select handler read them concurrently.
Move resps and err into the goroutine as local variables, and pass
results back through a typed channel (receiveResult struct) instead.
This also fixes push_consumer.go not signaling the done channel on
non-EOF errors, which previously caused it to wait for context
timeout instead of returning the actual error immediately.
Affected:
- golang/push_consumer.go: receiveMessage()
- golang/simple_consumer.go: receiveMessage()
Co-authored-by: guyinyou <[email protected]>
---
golang/push_consumer.go | 31 +++++++++++++++++--------------
golang/simple_consumer.go | 33 +++++++++++++++++----------------
2 files changed, 34 insertions(+), 30 deletions(-)
diff --git a/golang/push_consumer.go b/golang/push_consumer.go
index 84dd548a..111722c9 100644
--- a/golang/push_consumer.go
+++ b/golang/push_consumer.go
@@ -206,7 +206,6 @@ func (pc *defaultPushConsumer) GetGroupName() string {
}
func (pc *defaultPushConsumer) receiveMessage(ctx context.Context, request
*v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout
time.Duration) ([]*MessageView, error) {
- var err error
ctx = pc.cli.Sign(ctx)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
@@ -215,34 +214,38 @@ func (pc *defaultPushConsumer) receiveMessage(ctx
context.Context, request *v2.R
if err != nil {
return nil, err
}
- done := make(chan bool, 1)
+ type receiveResult struct {
+ responses []*v2.ReceiveMessageResponse
+ err error
+ }
+ done := make(chan receiveResult, 1)
- resps := make([]*v2.ReceiveMessageResponse, 0)
go func() {
+ var resps []*v2.ReceiveMessageResponse
+ var recvErr error
for {
var resp *v2.ReceiveMessageResponse
- resp, err = receiveMessageClient.Recv()
- if err == io.EOF {
- done <- true
- defer close(done)
+ resp, recvErr = receiveMessageClient.Recv()
+ if recvErr == io.EOF {
+ done <- receiveResult{responses: resps}
break
}
- if err != nil {
- pc.cli.log.Errorf("pushConsumer recv msg
err=%v, requestId=%s", err, utils.GetRequestID(ctx))
+ if recvErr != nil {
+ pc.cli.log.Errorf("pushConsumer recv msg
err=%v, requestId=%s", recvErr, utils.GetRequestID(ctx))
+ done <- receiveResult{err: recvErr}
break
}
sugarBaseLogger.Debugf("receiveMessage response: %v",
resp)
resps = append(resps, resp)
}
- cancel()
}()
select {
case <-ctx.Done():
// timeout
return nil, fmt.Errorf("[error] CODE=DEADLINE_EXCEEDED")
- case <-done:
- if err != nil && err != io.EOF {
- return nil, err
+ case result := <-done:
+ if result.err != nil {
+ return nil, result.err
}
messageViewList := make([]*MessageView, 0)
status := &v2.Status{
@@ -251,7 +254,7 @@ func (pc *defaultPushConsumer) receiveMessage(ctx
context.Context, request *v2.R
}
var deliveryTimestamp *timestamppb.Timestamp
messageList := make([]*v2.Message, 0)
- for _, resp := range resps {
+ for _, resp := range result.responses {
switch r := resp.GetContent().(type) {
case *v2.ReceiveMessageResponse_Status:
status = r.Status
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index a159afea..4818e05c 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -210,7 +210,6 @@ func (sc *defaultSimpleConsumer) GetGroupName() string {
}
func (sc *defaultSimpleConsumer) receiveMessage(ctx context.Context, request
*v2.ReceiveMessageRequest, messageQueue *v2.MessageQueue, timeout
time.Duration) ([]*MessageView, error) {
- var err error
ctx = sc.cli.Sign(ctx)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
@@ -219,36 +218,38 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx
context.Context, request *v2
if err != nil {
return nil, err
}
- done := make(chan bool, 1)
+ type receiveResult struct {
+ responses []*v2.ReceiveMessageResponse
+ err error
+ }
+ done := make(chan receiveResult, 1)
- resps := make([]*v2.ReceiveMessageResponse, 0)
go func() {
+ var resps []*v2.ReceiveMessageResponse
+ var recvErr error
for {
var resp *v2.ReceiveMessageResponse
- resp, err = receiveMessageClient.Recv()
- if err == io.EOF {
- done <- true
- defer close(done)
+ resp, recvErr = receiveMessageClient.Recv()
+ if recvErr == io.EOF {
+ done <- receiveResult{responses: resps}
break
}
- if err != nil {
- sc.cli.log.Errorf("simpleConsumer recv msg
err=%v, requestId=%s", err, utils.GetRequestID(ctx))
- done <- true
- defer close(done)
+ if recvErr != nil {
+ sc.cli.log.Errorf("simpleConsumer recv msg
err=%v, requestId=%s", recvErr, utils.GetRequestID(ctx))
+ done <- receiveResult{err: recvErr}
break
}
sugarBaseLogger.Debugf("receiveMessage response: %v",
resp)
resps = append(resps, resp)
}
- cancel()
}()
select {
case <-ctx.Done():
// timeout
return nil, fmt.Errorf("[error] CODE=DEADLINE_EXCEEDED")
- case <-done:
- if err != nil && err != io.EOF {
- return nil, err
+ case result := <-done:
+ if result.err != nil {
+ return nil, result.err
}
messageViewList := make([]*MessageView, 0)
status := &v2.Status{
@@ -257,7 +258,7 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx
context.Context, request *v2
}
var deliveryTimestamp *timestamppb.Timestamp
messageList := make([]*v2.Message, 0)
- for _, resp := range resps {
+ for _, resp := range result.responses {
switch r := resp.GetContent().(type) {
case *v2.ReceiveMessageResponse_Status:
status = r.Status