ShannonDing commented on a change in pull request #625:
URL: https://github.com/apache/rocketmq-client-go/pull/625#discussion_r595127806



##########
File path: consumer/push_consumer.go
##########
@@ -1132,15 +1139,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
        }
 }
 
-func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt) bool 
{
+func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt, mq 
*primitive.MessageQueue) bool {
        suspend := false
        if len(msgs) != 0 {
                maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes()
                for _, msg := range msgs {
                        if msg.ReconsumeTimes > maxReconsumeTimes {
                                rlog.Warning(fmt.Sprintf("msg will be send to 
retry topic due to ReconsumeTimes > %d, \n", maxReconsumeTimes), nil)
                                msg.WithProperty("RECONSUME_TIME", 
strconv.Itoa(int(msg.ReconsumeTimes)))
-                               if !pc.sendMessageBack("", msg, -1) {
+                               if !pc.sendMessageBack(mq.BrokerName, msg, -1) {

Review comment:
       need to discuss. IMO, for orderly messages in stream,  it should be deal 
with one by one.  the next one will be not consumed until the current message 
is consumed success. and the message that consumed failed more than 
[maxReconsumeTimes] times should be dropped rather than sending back again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to