This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 093add27b [translation][flink] Close the enumerator & reader resource 
(#2505)
093add27b is described below

commit 093add27b915670e5dbf7712eae639fe673ea14e
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Aug 23 08:32:26 2022 +0800

    [translation][flink] Close the enumerator & reader resource (#2505)
    
    add close & cancel debug log
---
 .../org/apache/seatunnel/translation/source/ParallelSource.java   | 8 ++++++++
 .../translation/flink/source/BaseSeaTunnelSourceFunction.java     | 7 +++++++
 2 files changed, 15 insertions(+)

diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 5495a2faa..95aa7f18e 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -27,6 +27,9 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -38,6 +41,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 public class ParallelSource<T, SplitT extends SourceSplit, StateT extends 
Serializable> implements BaseSourceFunction<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParallelSource.class);
 
     protected final SeaTunnelSource<T, SplitT, StateT> source;
     protected final ParallelEnumeratorContext<SplitT> 
parallelEnumeratorContext;
@@ -121,6 +125,7 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT extends Serial
             reader.pollNext(collector);
             Thread.sleep(SLEEP_TIME_INTERVAL);
         }
+        LOG.debug("Parallel source runs complete.");
     }
 
     @Override
@@ -130,14 +135,17 @@ public class ParallelSource<T, SplitT extends 
SourceSplit, StateT extends Serial
         running = false;
 
         if (executorService != null) {
+            LOG.debug("Close the thread pool resource.");
             executorService.shutdown();
         }
 
         if (splitEnumerator != null) {
+            LOG.debug("Close the split enumerator for the Apache SeaTunnel 
source.");
             splitEnumerator.close();
         }
 
         if (reader != null) {
+            LOG.debug("Close the data reader for the Apache SeaTunnel 
source.");
             reader.close();
         }
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 911c9b0a3..e91a73ed4 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -92,10 +92,17 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        cancel();
+        LOG.debug("Close the SeaTunnelSourceFunction of Flink.");
+    }
+
     @Override
     public void cancel() {
         running = false;
         try {
+            LOG.debug("Cancel the SeaTunnelSourceFunction of Flink.");
             internalSource.close();
         } catch (Exception e) {
             throw new RuntimeException(e);

Reply via email to