kaybinwong opened a new issue, #1016:
URL: https://github.com/apache/rocketmq-client-go/issues/1016

   **BUG REPORT**  
   1. Please describe the issue you observed:
   
   pulling msg with 1.2.4, but got PullBrokerTimeout error, it did't hanppend 
with spring client.
   ```
   ……
   
                                err := p.pullMQ(market)
                                if err != nil {
                                        log.Warnf("StartConsume pullMQ fail, 
symbol: %s, err: %v, market:%+v", symbol, err, *market)
                                        time.Sleep(ERRSLEEP)
                                }
   ……
   
   ```
   
   ```golang
   // 通过pull用offset的方式来避免重复消费
   func (p *rocketMQProvider) pullMQ(market *Market) error {
        symbol := market.symbol
        topic := TopicNameOrderProvider(symbol)
        //----start---------------
        mqs := p.consumer.FetchSubscriptionMessageQueues(topic)
        //mqs := p.consumer.MessageQueues(topic)
        //----end---------------
        log.Debugf("StartConsume fetch subscription topic:%s, mqs:%v", topic, 
mqs)
        if len(mqs) != 1 {
                //为了确保顺序性,不能用多个queue
                return errors.New(fmt.Sprintf("topic queue num error, topic:%s, 
num: %d", topic, len(mqs)))
        }
   
        mq := mqs[0]
        offset := market.offset
   
        //---------------start-----------------
        pr := p.consumer.Pull(mq, "*", offset, 800)
        //pr, _ := p.consumer.PullFrom(context.TODO(), mq, offset, 800)
        //---------------end-----------------
   
        log.Debugf("Leader pull after, offset: %d, mq: %s, result: %+v", 
offset, mq.String(), pr)
   
        switch pr.Status {
        case rocketmq.PullNoNewMsg:
                //case primitive.PullNoNewMsg:
                log.Debugf("rocketmq pull nonewmasg, market:%+v", market)
                time.Sleep(time.Millisecond * time.Duration(p.noMsgSleep))
                return nil
        case rocketmq.PullFound:
                //case primitive.PullFound:
                var offsetOrders *OffsetOrders
                isFirst := true
                //for _, msg := range pr.GetMessageExts() {
                for _, msg := range pr.Messages {
                        order, err := PbToOrder(msg)
                        if err != nil {
                                log.Warnf("rocketmq formatOrder failed, 
offset:%d, err: %v", msg.QueueOffset, err)
                                continue
                        }
                        if isFirst {
                                offsetOrders = &OffsetOrders{}
                                offsetOrders.BeginOffset = order.Offset
                                isFirst = false
                        }
                        //凑够10个(OffsetOrdersSize)
                        if len(offsetOrders.Orders) >= OffsetOrdersSize {
                                //凑够10个就发出去
                                market.orderCh <- offsetOrders
                                //crete new group
                                offsetOrders = &OffsetOrders{}
                                offsetOrders.BeginOffset = order.Offset
                        }
                        offsetOrders.Orders = append(offsetOrders.Orders, order)
                        offsetOrders.EndOffset = order.Offset
                        log.Debugf("provider send orderCh item 
offsetOrders:%+v, order:%+v", *offsetOrders, *order)
                }
   
                if offsetOrders != nil && len(offsetOrders.Orders) > 0 {
                        //这里来获取的.
                        market.orderCh <- offsetOrders
                }
        case rocketmq.PullNoMatchedMsg:
                //case primitive.PullNoMsgMatched:
                market.offset = pr.NextBeginOffset
                log.Errorf("broker PullNoMatchedMsg, offset:%d, pr:%+v", 
offset, pr)
                return errors.New("broker PullNoMatchedMsg")
        case rocketmq.PullOffsetIllegal:
                //case primitive.PullOffsetIllegal:
                market.offset = pr.NextBeginOffset
                log.Errorf("broker PullOffsetIllegal, offset:%d, pr:%+v", 
offset, pr)
                return errors.New("broker PullOffsetIllegal")
        case rocketmq.PullBrokerTimeout:
                //case primitive.PullBrokerTimeout:
                log.Errorf("broker timeout occur")
                return errors.New("broker timeout occur")
        }
        market.offset = pr.NextBeginOffset
        maxOffset := pr.MaxOffset
        if (maxOffset - market.offset) > 1000 {
                log.Warnf("MatchMessageBlocked, symbol:%s, blockNum:%d, 
maxOffset:%d, offset:%d", market.symbol, maxOffset-offset, maxOffset, offset)
        }
   
        return nil
   }
   ```
   
   2. Please tell us about your environment:
   
        - What is your OS?
        - kubernetes 1.24
   
        - What is your client version?
        - 1.2.4
   
        - What is your RocketMQ version?
        - 4.7.1
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions on how to fix, etc):
   
   ```
   2023-03-20 22:48:24  
   time="2023-03-20T22:48:24+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:48:17  
   time="2023-03-20T22:48:17+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:48:09  
   time="2023-03-20T22:48:09+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:48:04  
   time="2023-03-20T22:48:04+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:46:38  
   time="2023-03-20T22:46:38+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:45:29  
   time="2023-03-20T22:45:29+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:45:06  
   time="2023-03-20T22:45:06+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:43:57  
   time="2023-03-20T22:43:57+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:43:41  
   time="2023-03-20T22:43:41+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:43:23  
   time="2023-03-20T22:43:23+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:40:56  
   time="2023-03-20T22:40:56+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:39:34  
   time="2023-03-20T22:39:34+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:38:18  
   time="2023-03-20T22:38:18+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:36:48  
   time="2023-03-20T22:36:48+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:35:01  
   time="2023-03-20T22:35:01+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:33:19  
   time="2023-03-20T22:33:19+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:32:56  
   time="2023-03-20T22:32:56+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:31:15  
   time="2023-03-20T22:31:15+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:31:01  
   time="2023-03-20T22:31:01+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:30:34  
   time="2023-03-20T22:30:34+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:30:13  
   time="2023-03-20T22:30:13+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:29:41  
   time="2023-03-20T22:29:41+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:29:34  
   time="2023-03-20T22:29:34+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:29:16  
   time="2023-03-20T22:29:16+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:28:03  
   time="2023-03-20T22:28:03+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:27:25  
   time="2023-03-20T22:27:25+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:26:58  
   time="2023-03-20T22:26:58+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:26:06  
   time="2023-03-20T22:26:06+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:25:12  
   time="2023-03-20T22:25:12+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:23:47  
   time="2023-03-20T22:23:47+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:23:03  
   time="2023-03-20T22:23:03+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:22:53  
   time="2023-03-20T22:22:53+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:21:53  
   time="2023-03-20T22:21:53+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   
   
   2023-03-20 22:21:29  
   time="2023-03-20T22:21:29+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   2023-03-20 22:20:13  
   time="2023-03-20T22:20:13+08:00" level=warning msg="StartConsume pullMQ 
fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 
symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true 
offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} 
consumeCmdChan:0xc00091aa80 a:map[]}"
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to