yidaimi opened a new issue #2286:
URL: https://github.com/apache/rocketmq/issues/2286


   **BUG REPORT**
   
   1. Please describe the issue you observed:
   【需求】:
   
使用PushConsumer订阅了一个topic,并且想只订阅从这个consumer订阅的时间点起往后的数据(不需要历史数据)。我的想法是设置consumer的ConsumeFromWhere.CONSUME_FROM_TIMESTAMP和当前时间作为timestamp,如下:
   
           
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
           String timestamp = 
UtilAll.timeMillisToHumanString3(System.currentTimeMillis());
           consumer.setConsumeTimestamp(timestamp);
   
   为了保证consumer启动的时候所在的consumergroup每次都是新的,我将consumergroup设置为时间戳:
           Date date = new Date();
           String strDateFormat = "yyyyMMddHHmmss";
           SimpleDateFormat sdf = new SimpleDateFormat(strDateFormat);
           String now = sdf.format(date);
           // 实例化消费者
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(now);
   
   【操作】:
   在启动这个consumer之前,我先写了一个producer,往这个topic中写数据(写了10条)。确保数据写完之后,我启动了consumer。
   
   【实际结果】:
   这个consumer依然读取到了这10条数据。
   ConsumeFromWhere.CONSUME_FROM_TIMESTAMP没有起作用。
   
   【预期结果】:
   由于这个consumer启动晚于producer,所以应该读取不到这些数据。
   
   2. Please tell us about your environment:
   rocketmq 4.7.0 单broker
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions how to fix, etc):
   完整代码如下:
   producer:
   `public class SyncProducer {
   
       public static String TOPIC_NAME = "newManualTopic";
   
       public static void main(String[] args) throws Exception {
   
           // 实例化消息生产者Producer
           DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
           // 设置NameServer的地址
           producer.setNamesrvAddr("localhost:9876");
           // 启动Producer实例
           producer.start();
           for (int i = 0; i < 10; i++) {
               // 创建消息,并指定Topic,Tag和消息体
               Message msg = new Message(TOPIC_NAME /* Topic */,
                       "TagA" /* Tag */,
                       ("Hello RocketMQ " + 
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
               );
               // 发送消息到一个Broker
               SendResult sendResult = producer.send(msg);
               // 通过sendResult返回消息是否成功送达
               System.out.printf("%s%n", sendResult);
           }
           // 如果不再发送消息,关闭Producer实例。
           producer.shutdown();
       }
   }
   `
   Consumer:
   `import static producer.SyncProducer.TOPIC_NAME;
   
   public class Consumer {
       public static void main(String[] args) throws InterruptedException, 
MQClientException {
   
           Date date = new Date();
           String strDateFormat = "yyyyMMddHHmmss";
           SimpleDateFormat sdf = new SimpleDateFormat(strDateFormat);
           String now = sdf.format(date);
           // 实例化消费者
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(now);
           // 设置NameServer的地址
           consumer.setNamesrvAddr("localhost:9876");
   
           // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
           consumer.subscribe(TOPIC_NAME, "*");
           // 注册回调实现类来处理从broker拉取回来的消息
           consumer.registerMessageListener(new MessageListenerConcurrently() {
   
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                   System.out.printf("%s Receive New Messages: %s %n", 
Thread.currentThread().getName(), msgs);
                   // 标记该消息已经被成功消费
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
   
           
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);//ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
           String timestamp = 
UtilAll.timeMillisToHumanString3(System.currentTimeMillis());
           consumer.setConsumeTimestamp(timestamp);
   
   
           System.out.println("now:"+now);
           System.out.println("timestamp:"+timestamp);
           // 启动消费者实例
           consumer.start();
   
           System.out.printf("Consumer Started.%n");
   
       }
   }`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to