This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit bf277903084e5fba4f7acaa91570d178ae3dd403
Author: SteNicholas <[email protected]>
AuthorDate: Thu Aug 19 12:15:47 2021 +0800

    [#786] RocketMQSourceFunction supports the close of ExecutorService and 
ScheduledExecutorService (#790)
---
 .../rocketmq/flink/legacy/RocketMQSourceFunction.java      | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 8821a6d..e46daed 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -376,18 +376,32 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
         log.debug("cancel ...");
         runningChecker.setRunning(false);
 
+        if (timer != null) {
+            timer.shutdown();
+            timer = null;
+        }
+
+        if (executor != null) {
+            executor.shutdown();
+            executor = null;
+        }
+
         if (consumer != null) {
             consumer.shutdown();
+            consumer = null;
         }
 
         if (offsetTable != null) {
             offsetTable.clear();
+            offsetTable = null;
         }
         if (restoredOffsets != null) {
             restoredOffsets.clear();
+            restoredOffsets = null;
         }
         if (pendingOffsetsToCommit != null) {
             pendingOffsetsToCommit.clear();
+            pendingOffsetsToCommit = null;
         }
     }
 

Reply via email to