xgj1988 commented on issue #345: Restart consumption and repeat consumption URL: https://github.com/apache/rocketmq/issues/345#issuecomment-448188491 ### 消费者代码 ` package rocketmq.order; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name"); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTestOrder", "TagA || TagC || TagD"); // AtomicLong consumeTimes = new AtomicLong(0); consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { context.setAutoCommit(false); // System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); for (MessageExt msg : msgs) { System.out.printf("msgId:【%s】,queueId:【%d】,msgTag:【%s】,msgContent:【%s】", msg.getMsgId(), msg.getQueueId(), msg.getTags(), new String(msg.getBody())); } System.out.println("--------------------------"); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); System.out.printf("Consumer Started."); } } ` ### 生产者代:” `package rocketmq.order; import com.jio.common.util.TimeUtil; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class OrderedProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); String[] tags = new String[]{"TagA"}; for (int i = 0; i < 1; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. String body = ("Hello RocketMQ " + i + " ") + TimeUtil.getSimpleFullEngString(new Date()); Message msg = new Message("TopicTestOrder", "TagA", "KEY" + i, body.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }, orderId); System.out.printf("%s%n", sendResult); } //server shutdown producer.shutdown(); System.out.println("关闭"); } }`
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
