GongZhengMe opened a new issue #242: Timeout param maybe influence the batch 
consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242
 
 
   I write a demo about Batch Message and I think maybe I find a bug about it.
   plz see the two demo ,I will show how about that.
   There is one consumer demo with two producer demo  ,the difference of 
producer demo which param is timeout.
   ```
   @Service
   @RocketMQMessageListener(topic = "msgBatchTopic",consumerGroup 
="message-batch-consumer")
   public class BatchMsgConsumer implements RocketMQListener<MessageExt> {
       @Override
       public void onMessage(MessageExt message) {
           System.out.printf("------- MessageBatchConsumer received message, 
msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
       }
   }
   ```
   the first producer demo ,the method syncSend without timeout param
   ```
    /**
        * RocketMQ发送消息批处理,没有timeout参数
        *
        * @author gongzheng
        * @date 2020/3/16
        */
       @RequestMapping("BatchSend")
       public void batchSend() {
           List<Message> msgs = new ArrayList<Message>();
           for (int i = 0; i < 10; i++) {
               msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" 
+ i).
                       setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
           }
   
           SendResult sr = rocketMQTemplate.syncSend("msgBatchTopic", msgs);
   
           System.out.printf("--- Batch messages send result :" + sr + "\n");
       }
   ```
   the console print result
   ```
   --- Batch messages send result :SendResult [sendStatus=SEND_OK, 
msgId=FE80000000000000431374F820C47CCC000018B4AAC27F0F62FF000A, 
offsetMsgId=C0A82B6900002A9F000000000001BCC3, messageQueue=MessageQueue 
[topic=msgBatchTopic, brokerName=localhost, queueId=0], queueOffset=2]
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F0F62FF000A, 
body:[{"payload":"Hello RocketMQ Batch 
Msg#0","headers":{"KEYS":"KEY_0","id":"a09d42c5-48fb-c0a2-59e2-1255944037b7","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#1","headers":{"KEYS":"KEY_1","id":"90430362-ecbb-32da-4276-a2badd75c3de","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#2","headers":{"KEYS":"KEY_2","id":"4d3ed9d1-2d5b-73a5-11cf-ec12ccbd7693","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#3","headers":{"KEYS":"KEY_3","id":"5eaeca40-887e-36c8-f60a-7256344b9885","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#4","headers":{"KEYS":"KEY_4","id":"284dd59e-2ca9-69f3-5d9f-306e534a6cd5","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#5","headers":{"KEYS":"KEY_5","id":"8115e99f-01fb-d2b6-ab45-8a437c6146d2","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#6","headers":{"KEYS":"KEY_6","id":"82618c85-ec5b-2e26-267f-3e1df517b1e0","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#7","headers":{"KEYS":"KEY_7","id":"a1914316-5197-dbae-34e4-f4dc22476a0d","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#8","headers":{"KEYS":"KEY_8","id":"fe90e385-d92b-54fb-c7b2-c031e2f952ac","timestamp":1585123714814}},{"payload":"Hello
 RocketMQ Batch 
Msg#9","headers":{"KEYS":"KEY_9","id":"fcca974c-74d3-c797-1750-aa2c385e73cb","timestamp":1585123714814}}]
 
   ```
   consumer just consume message once and the ten payloads in the message
   
   the second producer demo ,the method syncSend with timeout param
   ```
     @RequestMapping("BatchSend")
       public void batchSend() {
           List<Message> msgs = new ArrayList<Message>();
           for (int i = 0; i < 10; i++) {
               msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" 
+ i).
                       setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
           }
   
           SendResult sr = rocketMQTemplate.syncSend("msgBatchTopic", 
msgs,60000);
   
           System.out.printf("--- Batch messages send result :" + sr + "\n");
       }
   ```
   the console print result
   ```
   --- Batch messages send result :SendResult [sendStatus=SEND_OK, 
msgId=FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0000,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0001,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0002,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0003,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0004,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0005,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0006,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0007,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0008,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0009,
 
offsetMsgId=C0A82B6900002A9F000000000001C520,C0A82B6900002A9F000000000001C65F,C0A82B6900002A9F000000000001C79E,C0A82B6900002A9F000000000001C8DD,C0A82B6900002A9F000000000001CA1C,C0A82B6900002A9F000000000001CB5B,C0A82B6900002A9F000000000001CC9A,C0A82B6900002A9F000000000001CDD9,C0A82B6900002A9F000000000001CF18,C0A82B6900002A9F000000000001D057,
 messageQueue=MessageQueue [topic=msgBatchTopic, brokerName=localhost, 
queueId=1], queueOffset=12]
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0000, body:Hello RocketMQ 
Batch Msg#0 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0001, body:Hello RocketMQ 
Batch Msg#1 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0002, body:Hello RocketMQ 
Batch Msg#2 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0003, body:Hello RocketMQ 
Batch Msg#3 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0004, body:Hello RocketMQ 
Batch Msg#4 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0006, body:Hello RocketMQ 
Batch Msg#6 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0005, body:Hello RocketMQ 
Batch Msg#5 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0007, body:Hello RocketMQ 
Batch Msg#7 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0008, body:Hello RocketMQ 
Batch Msg#8 
   ------- MessageBatchConsumer received message, msgId: 
FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0009, body:Hello RocketMQ 
Batch Msg#9 
   ```
   the consumer consume ten messages.
   I think the timeout param shouldn't  influence the consumer consume 
message,so I think this is a bug.
   And another thing ,I see the rocketMQ support a method for  split the batch 
message, and I don't know how to add the method in rocketMQ-spring 
   this is the method:
   ```
   public class ListSplitter implements Iterator<List<Message>> {
       private final int SIZE_LIMIT = 1000 * 1000;
       private final List<Message> messages;
       private int currIndex;
       public ListSplitter(List<Message> messages) {
               this.messages = messages;
       }
       @Override public boolean hasNext() {
           return currIndex < messages.size();
       }
       @Override public List<Message> next() {
           int nextIndex = currIndex;
           int totalSize = 0;
           for (; nextIndex < messages.size(); nextIndex++) {
               Message message = messages.get(nextIndex);
               int tmpSize = message.getTopic().length() + 
message.getBody().length;
               Map<String, String> properties = message.getProperties();
               for (Map.Entry<String, String> entry : properties.entrySet()) {
                   tmpSize += entry.getKey().length() + 
entry.getValue().length();
               }
               tmpSize = tmpSize + 20; //for log overhead
               if (tmpSize > SIZE_LIMIT) {
                   //it is unexpected that single message exceeds the SIZE_LIMIT
                   //here just let it go, otherwise it will block the splitting 
process
                   if (nextIndex - currIndex == 0) {
                      //if the next sublist has no element, add this one and 
then break, otherwise just break
                      nextIndex++;  
                   }
                   break;
               }
               if (tmpSize + totalSize > SIZE_LIMIT) {
                   break;
               } else {
                   totalSize += tmpSize;
               }
       
           }
           List<Message> subList = messages.subList(currIndex, nextIndex);
           currIndex = nextIndex;
           return subList;
       }
   }
   //then you could split the large list into small ones:
   ListSplitter splitter = new ListSplitter(messages);
   while (splitter.hasNext()) {
      try {
          List<Message>  listItem = splitter.next();
          producer.send(listItem);
      } catch (Exception e) {
          e.printStackTrace();
          //handle the error
      }
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to