ShannonDing commented on a change in pull request #625:
URL: https://github.com/apache/rocketmq-client-go/pull/625#discussion_r595127806
##########
File path: consumer/push_consumer.go
##########
@@ -1132,15 +1139,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
}
}
-func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt) bool
{
+func (pc *pushConsumer) checkReconsumeTimes(msgs []*primitive.MessageExt, mq
*primitive.MessageQueue) bool {
suspend := false
if len(msgs) != 0 {
maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes()
for _, msg := range msgs {
if msg.ReconsumeTimes > maxReconsumeTimes {
rlog.Warning(fmt.Sprintf("msg will be send to
retry topic due to ReconsumeTimes > %d, \n", maxReconsumeTimes), nil)
msg.WithProperty("RECONSUME_TIME",
strconv.Itoa(int(msg.ReconsumeTimes)))
- if !pc.sendMessageBack("", msg, -1) {
+ if !pc.sendMessageBack(mq.BrokerName, msg, -1) {
Review comment:
need to discuss. IMO, for orderly messages in stream, it should be deal
with one by one. the next one will be not consumed until the current message
is consumed success. and the message that consumed failed more than
[maxReconsumeTimes] times should be dropped rather than sending back again.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]