This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit bf277903084e5fba4f7acaa91570d178ae3dd403 Author: SteNicholas <[email protected]> AuthorDate: Thu Aug 19 12:15:47 2021 +0800 [#786] RocketMQSourceFunction supports the close of ExecutorService and ScheduledExecutorService (#790) --- .../rocketmq/flink/legacy/RocketMQSourceFunction.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java index 8821a6d..e46daed 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java @@ -376,18 +376,32 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT> log.debug("cancel ..."); runningChecker.setRunning(false); + if (timer != null) { + timer.shutdown(); + timer = null; + } + + if (executor != null) { + executor.shutdown(); + executor = null; + } + if (consumer != null) { consumer.shutdown(); + consumer = null; } if (offsetTable != null) { offsetTable.clear(); + offsetTable = null; } if (restoredOffsets != null) { restoredOffsets.clear(); + restoredOffsets = null; } if (pendingOffsetsToCommit != null) { pendingOffsetsToCommit.clear(); + pendingOffsetsToCommit = null; } }
