SteNicholas commented on code in PR #62:
URL: https://github.com/apache/rocketmq-flink/pull/62#discussion_r990722626


##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class 
`org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` 
|
 | consumer.offset.persist.interval | auto commit offset interval      |    
5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 
|
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were 
not found      |    10 |
 
+### Consumer From Where

Review Comment:
   ```suggestion
   ### Consumer Strategy
   ```



##########
pom.xml:
##########
@@ -38,6 +38,8 @@
         <commons-lang.version>2.6</commons-lang.version>
         <spotless.version>2.4.2</spotless.version>
         <jaxb-api.version>2.3.1</jaxb-api.version>
+        <!-- rocketmq-schema-registry need to compile by yourself for the time 
being -->

Review Comment:
   Removes the comments for rocketmq-schema-registry maven dependencies. The 
rocketmq-schema-registry is deploying to the maven repository.



##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class 
`org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` 
|
 | consumer.offset.persist.interval | auto commit offset interval      |    
5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 
|
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were 
not found      |    10 |
 
+### Consumer From Where
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+        new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1), 
201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3), 
123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies 
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY                            | DESCRIPTION                            
                      |
+| ----------------------------------- | 
------------------------------------------------------------ |
+| StartFromEarliest                   | consume from the earliest offset after 
restart with no state |
+| StartFromLatest                     | consume from the latest offset after 
restart with no state   |
+| StartFromTimeStamp                  | consume from the closest timestamp of 
data in each broker's queue |
+| StartFromGroupOffsets with LATEST   | If broker has the committed offset 
then consume from the next else consume from the latest offset |
+| StartFromGroupOffsets with EARLIEST | If broker has the committed offset 
,consume from the next ,otherwise consume from the earlist offset.It's useful 
when server  expand  broker |
+| StartFromSpecificOffsets            | consumer from the specificOffsets in 
broker's queues.Group offsets will be returned from those broker's queues whose 
didn't be specified |
+
+**Attention**
+
+Only if flink job start with none state ,those policies is effective.If job 
recover from checkpoint,offset would init from the stored data.

Review Comment:
   ```suggestion
   Only if Flink job starts with none state, these strategies are effective. If 
the job recovers from the checkpoint, the offset would intialize from the 
stored data.
   ```



##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class 
`org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` 
|
 | consumer.offset.persist.interval | auto commit offset interval      |    
5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 
|
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were 
not found      |    10 |
 
+### Consumer From Where
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+        new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1), 
201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3), 
123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies 
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY                            | DESCRIPTION                            
                      |
+| ----------------------------------- | 
------------------------------------------------------------ |
+| StartFromEarliest                   | consume from the earliest offset after 
restart with no state |

Review Comment:
   It's better to introduce the strategy enum for these strategies.



##########
src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java:
##########
@@ -214,22 +227,35 @@ public void open(Configuration parameters) throws 
Exception {
                 getRuntimeContext()
                         .getMetricGroup()
                         .meter(MetricUtils.METRICS_TPS, new 
MeterView(outputCounter, 60));
-    }
 
-    @Override
-    public void run(SourceContext context) throws Exception {
-        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
-        String tag =
-                props.getProperty(RocketMQConfig.CONSUMER_TAG, 
RocketMQConfig.DEFAULT_CONSUMER_TAG);
-        int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, 
DEFAULT_CONSUMER_BATCH_SIZE);
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);
 
         final RuntimeContext ctx = getRuntimeContext();
         // The lock that guarantees that record emission and state updates are 
atomic,
         // from the view of taking a checkpoint.
         int taskNumber = ctx.getNumberOfParallelSubtasks();
         int taskIndex = ctx.getIndexOfThisSubtask();
         log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", 
taskNumber, taskIndex);
+        Collection<MessageQueue> totalQueues = 
consumer.fetchSubscribeMessageQueues(topic);
+        messageQueues =
+                RocketMQUtils.allocate(totalQueues, taskNumber, 
ctx.getIndexOfThisSubtask());
+        // if job recover from state,The state already contains offsets of 
last commit.

Review Comment:
   ```suggestion
           // If the job recovers from the state, the state has already 
contained the offsets of last commit.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to