GitHub user loringgit edited a discussion: FIFO topic 消费端非严格有序

背景:
1.rockermq 5.3 版本,客户端版本:rocketmq-clients/golang/v5
2.创建FIFO topic,2 个queue
3.Consumer group 下有两个consumer实例
4.producer 发送消息的message group 都是fifo001,也就是只有一个queue有消息
   '''
  
Queue | minOffset | maxOffset | lastUpdateTimeStamp
-- | -- | -- | --
MessageQueue [topic=test_fifo_topic, brokerName=cf8605034927, queueId=1] | 0 | 
0 | 1970-01-01 08:00:00
MessageQueue [topic=test_fifo_topic, brokerName=cf8605034927, queueId=0] | 0 | 
1000 | 2025-04-01 09:28:13
'''

5.consumer 端代码参考的是example中SimpleConsumer,流程:收到消息后打印log=>模拟消息处理sleep一秒=> 打印log=> 
ACK,具体代码如下:
```
func main() {
        // log to console
        os.Setenv("mq.consoleAppender.enabled", "true")
        rmq_client.ResetLogger()
        // In most case, you don't need to create many consumers, singleton 
pattern is more recommended.
        simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
                Endpoint:      Endpoint,
                ConsumerGroup: ConsumerGroup,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
                        AccessSecret: SecretKey,
                },
        },
                rmq_client.WithAwaitDuration(awaitDuration),
                
rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
                        Topic: rmq_client.SUB_ALL,
                }),
        )
        if err != nil {
                log.Fatal(err)
        }
        // start simpleConsumer
        err = simpleConsumer.Start()
        if err != nil {
                log.Fatal(err)
        }
        // graceful stop simpleConsumer
        defer simpleConsumer.GracefulStop()
        go func() {
                fmt.Println("start receive message")
                for {
                        mvs, err := simpleConsumer.Receive(context.TODO(), 
maxMessageNum, invisibleDuration)
                        if err != nil {
                                fmt.Println(err)
                        }
                        // ack message
                        for _, mv := range mvs {
                                fmt.Printf("message receive group:%s, 
timestamp:%d, offset:%d,body:%s\n",
                                        *mv.GetMessageGroup(), 
time.Now().UnixMilli(), mv.GetOffset(), mv.GetBody())
                                if strings.EqualFold(*mv.GetMessageGroup(), 
"fifo001") {
                                        time.Sleep(time.Second * 1)
                                }
                                fmt.Printf("message receive group:%s, 
timestamp:%d, offset:%d,body:%s,key=%v TO_ACK\n",
                                        *mv.GetMessageGroup(), 
time.Now().UnixMilli(), mv.GetOffset(), mv.GetBody(), mv.GetKeys())
                                err = simpleConsumer.Ack(context.TODO(), mv)
                                if err != nil {
                                        fmt.Println("simpleConsumer.Ack, err:", 
err)
                                }
                        }
                        fmt.Println()
                }
        }()
```

测试打印log如下:
```
consumer 1 log如下:

selectMessageQueueId: 1
message receive group:fifo001, timestamp:1743471018946, offset:5,body:message_5
message receive group:fifo001, timestamp:1743471019947, 
offset:5,body:message_5,key=[5] TO_ACK

selectMessageQueueId: 0
message receive group:fifo001, timestamp:1743471019984, offset:7,body:message_7
message receive group:fifo001, timestamp:1743471020985, 
offset:7,body:message_7,key=[7] TO_ACK

consumer2 log如下:

selectMessageQueueId: 0
message receive group:fifo001, timestamp:1743471019768, 
offset:6,body:message_6,key=[6]
message receive group:fifo001, timestamp:1743471020793, 
offset:6,body:message_6,key=[6] TO_ACK

selectMessageQueueId: 1
message receive group:fifo001, timestamp:1743471020806, 
offset:8,body:message_8,key=[8]
message receive group:fifo001, timestamp:1743471021819, 
offset:8,body:message_8,key=[8] TO_ACK
```
问题1 :顺序消息顺序性
从文档看,[顺序消息负载均衡文档说明](https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance/),“在消费过程中,前面的消息M1、M2被消费者Consumer
 
A1处理时,只要消费状态没有提交,消费者A2是无法并行消费后续的M3、M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息”。但是从日志看,M3,M4 
在M2 未ACK前已经收到。这是由broker端处理由问题,还是我的用法不对?
consumer 2收到消息:timestamp:1743471019768, offset:6  
consumer 1 收到消息:timestamp:1743471018946, offset:5,
                     准备ack: timestamp:1743471019947, 
offset:5,body:message_5,key=[5] TO_ACK
从这里看到consumer1 的offset5消息尚未ack,consumer2 已经收到了offset 6 的消息

问题2 :queue 0 没有消息,client 轮询选择了queue0的时,也有消息返回,broker是怎么处理的?

GitHub link: https://github.com/apache/rocketmq/discussions/9306

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to