complone commented on issue #5420:
URL: https://github.com/apache/rocketmq/issues/5420#issuecomment-1295790930

   If you want to ensure local order, it is recommended to refer to the 
following example:
   
   Producer.java
   ```
   @RestController
   public class Product {
       private static List<ProductOrder> orderList = null;
       private static String producerGroup = "test_producer";
       /**
        * Simulation data
        */
       static {
           orderList = new ArrayList<>();
           orderList.add(new ProductOrder("XXX001", "Order Creation"));
           orderList.add(new ProductOrder("XXX001", "Order payment"));
           orderList.add(new ProductOrder("XXX001", "Order Completed"));
           orderList.add(new ProductOrder("XXX002", "Order Creation"));
           orderList.add(new ProductOrder("XXX002", "Order payment"));
           orderList.add(new ProductOrder("XXX002", "Order Completed"));
           orderList.add(new ProductOrder("XXX003", "Order Creation"));
           orderList.add(new ProductOrder("XXX003", "Order payment"));
           orderList.add(new ProductOrder("XXX003", "Order Completed"));
       }
   
       @GetMapping("message")
       public void sendMessage() throws Exception {
           // sample producer
           DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
           //Do not open the vip channel, open the port port will be reduced by 
2
           producer.setVipChannelEnabled(false);
           //Bind name server
           producer.setNamesrvAddr("IP:9876");
           producer.start();
           for (ProductOrder order : orderList) {
               //1. Generate message
               Message message = new Message(JmsConfig.TOPIC, "", 
order.getOrderId(), order.toString().getBytes());
               //2. Sending a message is to select the corresponding queue for 
each message
               SendResult sendResult = producer.send(message, new 
MessageQueueSelector() {
                   @Override
                   public MessageQueue select(List<MessageQueue> mqs, Message 
msg, Object arg) {
                       //3. The value of arg is actually the orderId passed in 
below
                       String orderid = (String) arg;
                       //4. Because the order is of String type, it is 
converted to int type through hashCode
                       int hashCode = orderid.hashCode();
                       //5, because the hashCode may be negative, so take the 
absolute value
                       hashCode = Math.abs(hashCode);
                       //6. Ensure that the same order number must be allocated 
on the same queue
                       long index = hashCode % mqs.size();
                       return mqs.get((int) index);
                   }
               }, order.getOrderId(),50000);
   
               System.out.printf("Product: send status=%s, storage queue=%s, 
orderid=%s, type=%s\n", sendResult.getSendStatus(),
                                         
sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType());
           }
           producer.shutdown();
       }
   }
   ```
   
   Since rocketmq uses a segmented lock, it does not lock the entire Broker but 
a single Queue in the lock, because as long as a single Queue is locked, local 
sequential consumption can be guaranteed.
   Therefore, if it is in a concurrent consumption scenario, it is not 
recommended to use MessageListenerConcurrently, but to use 
MessageListenerOrderly.
   Here is an example of a consumer:
   
   ```
   @Slf4j
   @Component
   public class Consumer {
       
       /**
        * Consumer entity object
        */
       private DefaultMQPushConsumer consumer;
       /**
        * Consumer group
        */
       public static final String CONSUMER_GROUP = "consumer_group";
       /**
        * instantiate the object through the constructor
        */
       public Consumer() throws MQClientException {
           consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
           consumer.setNamesrvAddr("IP:9876");
           //It is not recommended to enable vipChannel. Once enabled, when the 
rocketmq consumer starts to rebalance, the normal port (9876) will carry all 
the messages and transfer them to the vip port. But the consumer will not use 
the vip port for pulling messages.
           consumer.setVipChannelEnabled(false);
           //Subscribe information under topics and tags (* represents all tags)
           consumer.subscribe(JmsConfig.TOPIC, "*");
               //Register the monitoring of consumption. Note that the 
sequential consumption is MessageListenerOrderly and the concurrent is 
ConsumeConcurrentlyContext
           consumer.registerMessageListener((MessageListenerOrderly) (msgs, 
context) -> {
               // get message
               MessageExt msg = msgs.get(0);
               //The consumer gets the message here, only the output is done, 
and the subsequent logic processing is not performed
               log.info("Consumer-Thread Name={}, Message={}", 
Thread.currentThread().getName(), new String(msg.getBody()));
               return ConsumeOrderlyStatus.SUCCESS;
           });
           consumer.start();
       }
   }
   ```
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to