GitHub user loringgit added a comment to the discussion: FIFO topic 消费端非严格有序

1.topic创建的时候有指定是FIFO类型,创建命令:
```
./bin/mqadmin updateTopic -n nameserver(ip:port) -b mybroker(ip:port) -t 
test_fifo_topic  -p 6 -r 2 -w 2 -a +message.type=FIFO
```
我用的是 
github.com/apache/rocketmq-clients/golang/v5,NewSimpleConsumer默认使用的是v2.ClientType_SIMPLE_CONSUMER,有试过把这个默认值修改为
 
ClientType_PUSH_CONSUMER,测试结果没变化,你的意思是需要参考Java的PushConsumer自己实现OrderedMessage的Consumer吗?

2.日志中select MessageQueueId 是在ReceiveMessage方法增加的一行日志,代码见下面的注释
```
func (sc *defaultSimpleConsumer) Receive(ctx context.Context, maxMessageNum 
int32, invisibleDuration time.Duration) ([]*MessageView, error) {
        if !sc.isOn() {
                return nil, fmt.Errorf("simple consumer is not running")
        }
        if maxMessageNum <= 0 {
                return nil, fmt.Errorf("maxMessageNum must be greater than 0")
        }
        sc.subscriptionExpressionsLock.RLock()
        topics := make([]string, 0, len(sc.subscriptionExpressions))
        for k := range sc.subscriptionExpressions {
                topics = append(topics, k)
        }
        sc.subscriptionExpressionsLock.RUnlock()
        // All topic is subscribed.
        if len(topics) == 0 {
                return nil, fmt.Errorf("there is no topic to receive message")
        }
        next := atomic.AddInt32(&sc.topicIndex, 1)
        idx := utils.Mod(next+1, len(topics))
        topic := topics[idx]

        sc.subscriptionExpressionsLock.RLock()
        filterExpression, ok := sc.subscriptionExpressions[topic]
        sc.subscriptionExpressionsLock.RUnlock()
        if !ok {
                return nil, fmt.Errorf("no found filterExpression about topic: 
%s", topic)
        }
        subLoadBalancer, err := sc.getSubscriptionTopicRouteResult(ctx, topic)
        if err != nil {
                return nil, err
        }
        selectMessageQueue, err := subLoadBalancer.TakeMessageQueue()
        if err != nil {
                return nil, err
        }
        fmt.Printf("selectMessageQueueId: %v\n", selectMessageQueue.GetId())  
// 打印QueueId 
        request := sc.wrapReceiveMessageRequest(int(maxMessageNum), 
selectMessageQueue, filterExpression, invisibleDuration)
        timeout := sc.scOpts.awaitDuration + sc.cli.opts.timeout
        return sc.receiveMessage(ctx, request, selectMessageQueue, timeout)
}
```
@francisoliverlee 

GitHub link: 
https://github.com/apache/rocketmq/discussions/9306#discussioncomment-12696340

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to