This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new bf4462c2 [Go] Fix missing return after retry scheduling in 
process_queue (#1265)
bf4462c2 is described below

commit bf4462c24b50a1f8f1f5b0180d2256d2b8cadedb
Author: guyinyou <[email protected]>
AuthorDate: Thu Jun 11 14:53:08 2026 +0800

    [Go] Fix missing return after retry scheduling in process_queue (#1265)
    
    In ackMessage0, changeInvisibleDuration, and forwardToDeadLetterQueue0,
    when the response code is not OK, the *Later() retry function is called
    but execution falls through to callback(nil), causing the message to be
    prematurely evicted from cache before the operation actually succeeds.
    This can lead to message loss.
    
    Add return statements after each *Later() call to prevent fallthrough,
    matching the Java implementation's behavior where the future is only
    resolved on terminal states (success or abandoned).
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/process_queue.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/golang/process_queue.go b/golang/process_queue.go
index 1109a9c7..4fb45853 100644
--- a/golang/process_queue.go
+++ b/golang/process_queue.go
@@ -154,6 +154,7 @@ func (dpq *defaultProcessQueue) 
forwardToDeadLetterQueue0(mv *MessageView, attem
                        " clientId=%s, consumerGroup=%s, messageId=%s, 
attempt=%d, mq=%s, endpoints=%v, requestId=%s, status message=[%s]", clientId, 
consumerGroup, messageId, attempt, dpq.mqstr,
                        endpoints, requestId, status.GetMessage())
                dpq.forwardToDeadLetterQueueLater(mv, 1+attempt, callback)
+               return
        }
        // Set result if succeed in changing invisible time.
        callback(nil)
@@ -237,6 +238,7 @@ func (dpq *defaultProcessQueue) changeInvisibleDuration(mv 
*MessageView, duratio
                        " clientId=%s, consumerGroup=%s, messageId=%s, 
attempt=%d, mq=%s, endpoints=%v, requestId=%s, status message=[%s]", clientId, 
consumerGroup, messageId, attempt, dpq.mqstr,
                        endpoints, requestId, status.GetMessage())
                dpq.changeInvisibleDurationLater(mv, duration, 1+attempt, 
callback)
+               return
        }
        // Set result if succeed in changing invisible time.
        callback(nil)
@@ -303,6 +305,7 @@ func (dpq *defaultProcessQueue) ackMessage0(mv 
*MessageView, attempt int, callba
                        " clientId=%s, consumerGroup=%s, messageId=%s, 
attempt=%d, mq=%s, endpoints=%v, requestId=%s, status message=[%s]", clientId, 
consumerGroup, messageId, attempt, dpq.mqstr,
                        endpoints, requestId, status.GetMessage())
                dpq.ackMessageLater(mv, 1+attempt, callback)
+               return
        }
        // Set result if succeed in changing invisible time.
        callback(nil)

Reply via email to