RongtongJin commented on code in PR #1275:
URL: https://github.com/apache/rocketmq-clients/pull/1275#discussion_r3401411122
##########
golang/simple_consumer.go:
##########
@@ -219,22 +218,25 @@ func (sc *defaultSimpleConsumer) receiveMessage(ctx
context.Context, request *v2
if err != nil {
return nil, err
}
- done := make(chan bool, 1)
+ type receiveResult struct {
+ responses []*v2.ReceiveMessageResponse
+ err error
+ }
+ done := make(chan receiveResult, 1)
- resps := make([]*v2.ReceiveMessageResponse, 0)
go func() {
+ var resps []*v2.ReceiveMessageResponse
+ var recvErr error
for {
var resp *v2.ReceiveMessageResponse
- resp, err = receiveMessageClient.Recv()
- if err == io.EOF {
- done <- true
- defer close(done)
+ resp, recvErr = receiveMessageClient.Recv()
+ if recvErr == io.EOF {
+ done <- receiveResult{responses: resps}
break
}
- if err != nil {
- sc.cli.log.Errorf("simpleConsumer recv msg
err=%v, requestId=%s", err, utils.GetRequestID(ctx))
- done <- true
- defer close(done)
+ if recvErr != nil {
+ sc.cli.log.Errorf("simpleConsumer recv msg
err=%v, requestId=%s", recvErr, utils.GetRequestID(ctx))
+ done <- receiveResult{err: recvErr}
Review Comment:
这里和 push consumer 一样,发送 `receiveResult` 后 goroutine 仍会执行后面的 `cancel()`,外层
`select` 可能随机走 `ctx.Done()` 分支。这样真实 `Recv()` 错误,甚至 EOF 后已收集到的响应,都可能被覆盖成
deadline exceeded。建议移除 goroutine 里的 `cancel()`,只保留外层 `defer
cancel()`,或让结果通道优先被消费。
##########
golang/push_consumer.go:
##########
@@ -215,20 +214,25 @@ func (pc *defaultPushConsumer) receiveMessage(ctx
context.Context, request *v2.R
if err != nil {
return nil, err
}
- done := make(chan bool, 1)
+ type receiveResult struct {
+ responses []*v2.ReceiveMessageResponse
+ err error
+ }
+ done := make(chan receiveResult, 1)
- resps := make([]*v2.ReceiveMessageResponse, 0)
go func() {
+ var resps []*v2.ReceiveMessageResponse
+ var recvErr error
for {
var resp *v2.ReceiveMessageResponse
- resp, err = receiveMessageClient.Recv()
- if err == io.EOF {
- done <- true
- defer close(done)
+ resp, recvErr = receiveMessageClient.Recv()
+ if recvErr == io.EOF {
+ done <- receiveResult{responses: resps}
break
}
- if err != nil {
- pc.cli.log.Errorf("pushConsumer recv msg
err=%v, requestId=%s", err, utils.GetRequestID(ctx))
+ if recvErr != nil {
+ pc.cli.log.Errorf("pushConsumer recv msg
err=%v, requestId=%s", recvErr, utils.GetRequestID(ctx))
+ done <- receiveResult{err: recvErr}
Review Comment:
这里把真实 `Recv()` 错误发送到 `done` 后,goroutine 会继续走到后面的 `cancel()`。外层 `select`
可能同时看到 `<-done` 和 `<-ctx.Done()` 都 ready,然后随机选择 ctx 分支,导致这个真实错误被误报成 `[error]
CODE=DEADLINE_EXCEEDED`。建议不要在接收 goroutine 里 cancel,依赖外层 `defer
cancel()`,或者重构等待逻辑保证 `done` 的结果优先返回。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]