This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 8089fb1255507f71a775263033c908858c0fb940
Author: Jennifer-sarah <[email protected]>
AuthorDate: Fri Mar 22 00:52:40 2019 +0800

    clean up some dirty code
    
    clean up some dirty code
---
 .../java/org/apache/rocketmq/flink/RocketMQSource.java     | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index f610efe..5b76e54 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -62,7 +62,7 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getLong;
  * Otherwise, the source doesn't provide any reliability guarantees.
  */
 public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
-        implements CheckpointedFunction, CheckpointListener, 
ResultTypeQueryable<OUT> {
+    implements CheckpointedFunction, CheckpointListener, 
ResultTypeQueryable<OUT> {
 
     private static final long serialVersionUID = 1L;
 
@@ -131,15 +131,15 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
         final Object lock = context.getCheckpointLock();
 
         int delayWhenMessageNotFound = getInteger(props, 
RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
-                RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
+            RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
 
         String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, 
RocketMQConfig.DEFAULT_CONSUMER_TAG);
 
         int pullPoolSize = getInteger(props, 
RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
-                RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
+            RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
 
         int pullBatchSize = getInteger(props, 
RocketMQConfig.CONSUMER_BATCH_SIZE,
-                RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
+            RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
 
         pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
         pullConsumerScheduleService.registerPullTaskCallback(topic, new 
PullTaskCallback() {
@@ -234,7 +234,7 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
                         break;
                     case CONSUMER_OFFSET_TIMESTAMP:
                         offset = consumer.searchOffset(mq, getLong(props,
-                                RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, 
System.currentTimeMillis()));
+                            RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, 
System.currentTimeMillis()));
                         break;
                     default:
                         throw new IllegalArgumentException("Unknown value for 
CONSUMER_OFFSET_RESET_TO.");
@@ -311,7 +311,7 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
         LOG.debug("initialize State ...");
 
         this.unionOffsetStates = 
context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
-                OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() { })));
+            OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() { })));
 
         this.restored = context.isRestored();
 
@@ -337,7 +337,7 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        // consumer.c
+        // callback when checkpoint complete 
         if (!runningChecker.isRunning()) {
             LOG.debug("notifyCheckpointComplete() called on closed source; 
returning null.");
             return;

Reply via email to