complone commented on issue #5420:
URL: https://github.com/apache/rocketmq/issues/5420#issuecomment-1295790930
If you want to ensure local order, it is recommended to refer to the
following example:
Producer.java
```
@RestController
public class Product {
private static List<ProductOrder> orderList = null;
private static String producerGroup = "test_producer";
/**
* Simulation data
*/
static {
orderList = new ArrayList<>();
orderList.add(new ProductOrder("XXX001", "Order Creation"));
orderList.add(new ProductOrder("XXX001", "Order payment"));
orderList.add(new ProductOrder("XXX001", "Order Completed"));
orderList.add(new ProductOrder("XXX002", "Order Creation"));
orderList.add(new ProductOrder("XXX002", "Order payment"));
orderList.add(new ProductOrder("XXX002", "Order Completed"));
orderList.add(new ProductOrder("XXX003", "Order Creation"));
orderList.add(new ProductOrder("XXX003", "Order payment"));
orderList.add(new ProductOrder("XXX003", "Order Completed"));
}
@GetMapping("message")
public void sendMessage() throws Exception {
// sample producer
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
//Do not open the vip channel, open the port port will be reduced by
2
producer.setVipChannelEnabled(false);
//Bind name server
producer.setNamesrvAddr("IP:9876");
producer.start();
for (ProductOrder order : orderList) {
//1. Generate message
Message message = new Message(JmsConfig.TOPIC, "",
order.getOrderId(), order.toString().getBytes());
//2. Sending a message is to select the corresponding queue for
each message
SendResult sendResult = producer.send(message, new
MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message
msg, Object arg) {
//3. The value of arg is actually the orderId passed in
below
String orderid = (String) arg;
//4. Because the order is of String type, it is
converted to int type through hashCode
int hashCode = orderid.hashCode();
//5, because the hashCode may be negative, so take the
absolute value
hashCode = Math.abs(hashCode);
//6. Ensure that the same order number must be allocated
on the same queue
long index = hashCode % mqs.size();
return mqs.get((int) index);
}
}, order.getOrderId(),50000);
System.out.printf("Product: send status=%s, storage queue=%s,
orderid=%s, type=%s\n", sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType());
}
producer.shutdown();
}
}
```
Since rocketmq uses a segmented lock, it does not lock the entire Broker but
a single Queue in the lock, because as long as a single Queue is locked, local
sequential consumption can be guaranteed.
Therefore, if it is in a concurrent consumption scenario, it is not
recommended to use MessageListenerConcurrently, but to use
MessageListenerOrderly.
Here is an example of a consumer:
```
@Slf4j
@Component
public class Consumer {
/**
* Consumer entity object
*/
private DefaultMQPushConsumer consumer;
/**
* Consumer group
*/
public static final String CONSUMER_GROUP = "consumer_group";
/**
* instantiate the object through the constructor
*/
public Consumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr("IP:9876");
//It is not recommended to enable vipChannel. Once enabled, when the
rocketmq consumer starts to rebalance, the normal port (9876) will carry all
the messages and transfer them to the vip port. But the consumer will not use
the vip port for pulling messages.
consumer.setVipChannelEnabled(false);
//Subscribe information under topics and tags (* represents all tags)
consumer.subscribe(JmsConfig.TOPIC, "*");
//Register the monitoring of consumption. Note that the
sequential consumption is MessageListenerOrderly and the concurrent is
ConsumeConcurrentlyContext
consumer.registerMessageListener((MessageListenerOrderly) (msgs,
context) -> {
// get message
MessageExt msg = msgs.get(0);
//The consumer gets the message here, only the output is done,
and the subsequent logic processing is not performed
log.info("Consumer-Thread Name={}, Message={}",
Thread.currentThread().getName(), new String(msg.getBody()));
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]