This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 65e30294aaaa25992752853a04fdfa69cd3c0e86
Author: Jennifer-sarah <[email protected]>
AuthorDate: Thu Mar 21 13:43:12 2019 +0800

    update consumer offset after checkpoint completed
    
    update consumer offset after checkpoint completed
---
 .../org/apache/rocketmq/flink/RocketMQSource.java  | 27 ++++++++++++++++------
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 8e8e57b..14b8042 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -60,7 +61,7 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getLong;
  * Otherwise, the source doesn't provide any reliability guarantees.
  */
 public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
-    implements CheckpointedFunction, ResultTypeQueryable<OUT> {
+        implements CheckpointedFunction, CheckpointListener, 
ResultTypeQueryable<OUT> {
 
     private static final long serialVersionUID = 1L;
 
@@ -126,15 +127,15 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
         final Object lock = context.getCheckpointLock();
 
         int delayWhenMessageNotFound = getInteger(props, 
RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
-            RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
+                RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
 
         String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, 
RocketMQConfig.DEFAULT_CONSUMER_TAG);
 
         int pullPoolSize = getInteger(props, 
RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
-            RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
+                RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
 
         int pullBatchSize = getInteger(props, 
RocketMQConfig.CONSUMER_BATCH_SIZE,
-            RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
+                RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
 
         pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
         pullConsumerScheduleService.registerPullTaskCallback(topic, new 
PullTaskCallback() {
@@ -229,7 +230,7 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
                         break;
                     case CONSUMER_OFFSET_TIMESTAMP:
                         offset = consumer.searchOffset(mq, getLong(props,
-                            RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, 
System.currentTimeMillis()));
+                                RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, 
System.currentTimeMillis()));
                         break;
                     default:
                         throw new IllegalArgumentException("Unknown value for 
CONSUMER_OFFSET_RESET_TO.");
@@ -286,7 +287,7 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Snapshotted state, last processed offsets: {}, 
checkpoint id: {}, timestamp: {}",
-                offsetTable, context.getCheckpointId(), 
context.getCheckpointTimestamp());
+                    offsetTable, context.getCheckpointId(), 
context.getCheckpointTimestamp());
         }
 
         for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
@@ -304,7 +305,7 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
         LOG.debug("initialize State ...");
 
         this.unionOffsetStates = 
context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
-            OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() { })));
+                OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() { })));
 
         this.restored = context.isRestored();
 
@@ -327,4 +328,16 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
     public TypeInformation<OUT> getProducedType() {
         return schema.getProducedType();
     }
+
+    @Override
+    public void notifyCheckpointComplete(long l) throws Exception {
+        if (!runningChecker.isRunning()) {
+            LOG.debug("notifyCheckpointComplete() called on closed source; 
returning null.");
+            return;
+        }
+
+        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+            consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
+        }
+    }
 }

Reply via email to