This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 8089fb1255507f71a775263033c908858c0fb940 Author: Jennifer-sarah <[email protected]> AuthorDate: Fri Mar 22 00:52:40 2019 +0800 clean up some dirty code clean up some dirty code --- .../java/org/apache/rocketmq/flink/RocketMQSource.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index f610efe..5b76e54 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -62,7 +62,7 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getLong; * Otherwise, the source doesn't provide any reliability guarantees. */ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> - implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> { + implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> { private static final long serialVersionUID = 1L; @@ -131,15 +131,15 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> final Object lock = context.getCheckpointLock(); int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND, - RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); + RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG); int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE, - RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); + RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE, - RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE); + RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE); pullConsumerScheduleService.setPullThreadNums(pullPoolSize); pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() { @@ -234,7 +234,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> break; case CONSUMER_OFFSET_TIMESTAMP: offset = consumer.searchOffset(mq, getLong(props, - RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); + RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); break; default: throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO."); @@ -311,7 +311,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> LOG.debug("initialize State ..."); this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>( - OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { }))); + OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { }))); this.restored = context.isRestored(); @@ -337,7 +337,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - // consumer.c + // callback when checkpoint complete if (!runningChecker.isRunning()) { LOG.debug("notifyCheckpointComplete() called on closed source; returning null."); return;
