deemogsw commented on code in PR #62:
URL: https://github.com/apache/rocketmq-flink/pull/62#discussion_r990773071


##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class 
`org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` 
|
 | consumer.offset.persist.interval | auto commit offset interval      |    
5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 
|
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were 
not found      |    10 |
 
+### Consumer From Where
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+        new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1), 
201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3), 
123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies 
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY                            | DESCRIPTION                            
                      |
+| ----------------------------------- | 
------------------------------------------------------------ |
+| StartFromEarliest                   | consume from the earliest offset after 
restart with no state |

Review Comment:
   > Thanks for @deemogsw the feature contribution. I left some comments for 
the feature support of `RocketMQSourceFunction`. Could you please also support 
this feature for the `RocketMQSource`?
   > 
   > BTW, could you please take a look at the failure of the CI?
   
   Thanks for comments.
   I'm glad to perfect those feature for new source interface.I will open a new 
issue for RocketMQSource.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to