GitHub user loringgit added a comment to the discussion: FIFO topic 消费端非严格有序
1.topic创建的时候有指定是FIFO类型,创建命令: ``` ./bin/mqadmin updateTopic -n nameserver(ip:port) -b mybroker(ip:port) -t test_fifo_topic -p 6 -r 2 -w 2 -a +message.type=FIFO ``` 我用的是 github.com/apache/rocketmq-clients/golang/v5,NewSimpleConsumer默认使用的是v2.ClientType_SIMPLE_CONSUMER,有试过把这个默认值修改为 ClientType_PUSH_CONSUMER,测试结果没变化,你的意思是需要参考Java的PushConsumer自己实现OrderedMessage的Consumer吗? 2.日志中select MessageQueueId 是在ReceiveMessage方法增加的一行日志,代码见下面的注释 ``` func (sc *defaultSimpleConsumer) Receive(ctx context.Context, maxMessageNum int32, invisibleDuration time.Duration) ([]*MessageView, error) { if !sc.isOn() { return nil, fmt.Errorf("simple consumer is not running") } if maxMessageNum <= 0 { return nil, fmt.Errorf("maxMessageNum must be greater than 0") } sc.subscriptionExpressionsLock.RLock() topics := make([]string, 0, len(sc.subscriptionExpressions)) for k := range sc.subscriptionExpressions { topics = append(topics, k) } sc.subscriptionExpressionsLock.RUnlock() // All topic is subscribed. if len(topics) == 0 { return nil, fmt.Errorf("there is no topic to receive message") } next := atomic.AddInt32(&sc.topicIndex, 1) idx := utils.Mod(next+1, len(topics)) topic := topics[idx] sc.subscriptionExpressionsLock.RLock() filterExpression, ok := sc.subscriptionExpressions[topic] sc.subscriptionExpressionsLock.RUnlock() if !ok { return nil, fmt.Errorf("no found filterExpression about topic: %s", topic) } subLoadBalancer, err := sc.getSubscriptionTopicRouteResult(ctx, topic) if err != nil { return nil, err } selectMessageQueue, err := subLoadBalancer.TakeMessageQueue() if err != nil { return nil, err } fmt.Printf("selectMessageQueueId: %v\n", selectMessageQueue.GetId()) // 打印QueueId request := sc.wrapReceiveMessageRequest(int(maxMessageNum), selectMessageQueue, filterExpression, invisibleDuration) timeout := sc.scOpts.awaitDuration + sc.cli.opts.timeout return sc.receiveMessage(ctx, request, selectMessageQueue, timeout) } ``` @francisoliverlee GitHub link: https://github.com/apache/rocketmq/discussions/9306#discussioncomment-12696340 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org