dragonTalon opened a new pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337


   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   增加了一个自定义延迟消息的实现,通过message 的property中的属性来实现
   
   Added the realization of a custom delayed message, which is realized through 
the properties in the property of the message
   
   property name :SPECIFY_DELAY_TIME
   
   value : 
   格式为: ()d()h()m()s
   例: 1d1h1m1s 表示为 延迟 1天加1小时加1分钟加1秒
   
   The format is: ()d()h()m()s
   Example: 1d1h1m1s is expressed as delay 1 day plus 1 hour plus 1 minute plus 
1 second
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   验证方式:(test way)
   生产者(production)
   `
      DefaultMQProducer producer = new DefaultMQProducer("DELAY_GROUP_TAG");
   
           producer.setNamesrvAddr("127.0.0.1:9876");
   
           producer.start();
   
           final byte[] bytes = "this is cutomer 2m2s message" 
.getBytes(StandardCharsets.UTF_8);
   
           Message message = new Message("DELAY_TOPIC_TAG_1", "*", bytes);
   
           //这是我实现延迟队列用到的
   
           message.putUserProperty("SPECIFY_DELAY_TIME", "2m2s");
   
           producer.sendOneway(message);
   
           producer.shutdown();
   `
   消费者:(consume)
   `
       public static void main(String[] args) throws MQClientException {
   
           DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("DELAY_GROUP_TAG");
   
           consumer.setNamesrvAddr("127.0.0.1:9876");
   
           
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
   
           //Topic的名称
   
           consumer.subscribe("DELAY_TOPIC_TAG_1", "*");
   
           consumer.setSuspendCurrentQueueTimeMillis(30);
   
           consumer.registerMessageListener(new MessageListenerConcurrently() {
   
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
   
                   for (MessageExt ext : list) {
   
                       final long time = System.currentTimeMillis() - 
ext.getBornTimestamp();
   
                       System.out.println("消息\t" + new String(ext.getBody(), 
StandardCharsets.UTF_8) + "接收到的消息间隔消息\t" + time / 1000.0 );
   
                   }
   
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
   
           });
   
           consumer.start();
       }
   `
   [这是我的验证结果文档](https://zhuanlan.zhihu.com/p/408702118)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to