GitHub user loringgit edited a discussion: FIFO topic 消费端非严格有序
背景: 1.rockermq 5.3 版本,客户端版本:rocketmq-clients/golang/v5 2.创建FIFO topic,2 个queue 3.Consumer group 下有两个consumer实例 4.producer 发送消息的message group 都是fifo001,也就是只有一个queue有消息 Queue | minOffset | maxOffset | lastUpdateTimeStamp -- | -- | -- | -- MessageQueue [topic=test_fifo_topic, brokerName=cf8605034927, queueId=1] | 0 | 0 | 1970-01-01 08:00:00 MessageQueue [topic=test_fifo_topic, brokerName=cf8605034927, queueId=0] | 0 | 1000 | 2025-04-01 09:28:13 5.consumer 端代码参考的是example中SimpleConsumer,流程:收到消息后打印log=>模拟消息处理sleep一秒=> 打印log=> ACK,具体代码如下: ``` func main() { // log to console os.Setenv("mq.consoleAppender.enabled", "true") rmq_client.ResetLogger() // In most case, you don't need to create many consumers, singleton pattern is more recommended. simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{ Endpoint: Endpoint, ConsumerGroup: ConsumerGroup, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, rmq_client.WithAwaitDuration(awaitDuration), rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{ Topic: rmq_client.SUB_ALL, }), ) if err != nil { log.Fatal(err) } // start simpleConsumer err = simpleConsumer.Start() if err != nil { log.Fatal(err) } // graceful stop simpleConsumer defer simpleConsumer.GracefulStop() go func() { fmt.Println("start receive message") for { mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration) if err != nil { fmt.Println(err) } // ack message for _, mv := range mvs { fmt.Printf("message receive group:%s, timestamp:%d, offset:%d,body:%s\n", *mv.GetMessageGroup(), time.Now().UnixMilli(), mv.GetOffset(), mv.GetBody()) if strings.EqualFold(*mv.GetMessageGroup(), "fifo001") { time.Sleep(time.Second * 1) } fmt.Printf("message receive group:%s, timestamp:%d, offset:%d,body:%s,key=%v TO_ACK\n", *mv.GetMessageGroup(), time.Now().UnixMilli(), mv.GetOffset(), mv.GetBody(), mv.GetKeys()) err = simpleConsumer.Ack(context.TODO(), mv) if err != nil { fmt.Println("simpleConsumer.Ack, err:", err) } } fmt.Println() } }() ``` 测试打印log如下: ``` consumer 1 log如下: selectMessageQueueId: 1 message receive group:fifo001, timestamp:1743471018946, offset:5,body:message_5 message receive group:fifo001, timestamp:1743471019947, offset:5,body:message_5,key=[5] TO_ACK selectMessageQueueId: 0 message receive group:fifo001, timestamp:1743471019984, offset:7,body:message_7 message receive group:fifo001, timestamp:1743471020985, offset:7,body:message_7,key=[7] TO_ACK consumer2 log如下: selectMessageQueueId: 0 message receive group:fifo001, timestamp:1743471019768, offset:6,body:message_6,key=[6] message receive group:fifo001, timestamp:1743471020793, offset:6,body:message_6,key=[6] TO_ACK selectMessageQueueId: 1 message receive group:fifo001, timestamp:1743471020806, offset:8,body:message_8,key=[8] message receive group:fifo001, timestamp:1743471021819, offset:8,body:message_8,key=[8] TO_ACK ``` 问题1 :顺序消息顺序性 从文档看,[顺序消息负载均衡文档说明](https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance/),“在消费过程中,前面的消息M1、M2被消费者Consumer A1处理时,只要消费状态没有提交,消费者A2是无法并行消费后续的M3、M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息”。但是从日志看,M3,M4 在M2 未ACK前已经收到。这是由broker端处理由问题,还是我的用法不对? ``` consumer 2收到消息:timestamp:1743471019768, offset:6 consumer 1 收到消息:timestamp:1743471018946, offset:5, consumer 1 准备ack 时刻: timestamp:1743471019947, offset:5,body:message_5,key=[5] TO_ACK ``` 从这里看到consumer1 的offset5消息尚未ack,consumer2 已经收到了offset 6 的消息 问题2 :queue 0 没有消息,client 轮询选择了queue0的时,也有消息返回,broker是怎么处理的? GitHub link: https://github.com/apache/rocketmq/discussions/9306 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org