This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 093add27b [translation][flink] Close the enumerator & reader resource
(#2505)
093add27b is described below
commit 093add27b915670e5dbf7712eae639fe673ea14e
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Aug 23 08:32:26 2022 +0800
[translation][flink] Close the enumerator & reader resource (#2505)
add close & cancel debug log
---
.../org/apache/seatunnel/translation/source/ParallelSource.java | 8 ++++++++
.../translation/flink/source/BaseSeaTunnelSourceFunction.java | 7 +++++++
2 files changed, 15 insertions(+)
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 5495a2faa..95aa7f18e 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -27,6 +27,9 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -38,6 +41,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
public class ParallelSource<T, SplitT extends SourceSplit, StateT extends
Serializable> implements BaseSourceFunction<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParallelSource.class);
protected final SeaTunnelSource<T, SplitT, StateT> source;
protected final ParallelEnumeratorContext<SplitT>
parallelEnumeratorContext;
@@ -121,6 +125,7 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT extends Serial
reader.pollNext(collector);
Thread.sleep(SLEEP_TIME_INTERVAL);
}
+ LOG.debug("Parallel source runs complete.");
}
@Override
@@ -130,14 +135,17 @@ public class ParallelSource<T, SplitT extends
SourceSplit, StateT extends Serial
running = false;
if (executorService != null) {
+ LOG.debug("Close the thread pool resource.");
executorService.shutdown();
}
if (splitEnumerator != null) {
+ LOG.debug("Close the split enumerator for the Apache SeaTunnel
source.");
splitEnumerator.close();
}
if (reader != null) {
+ LOG.debug("Close the data reader for the Apache SeaTunnel
source.");
reader.close();
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 911c9b0a3..e91a73ed4 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -92,10 +92,17 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
}
}
+ @Override
+ public void close() throws Exception {
+ cancel();
+ LOG.debug("Close the SeaTunnelSourceFunction of Flink.");
+ }
+
@Override
public void cancel() {
running = false;
try {
+ LOG.debug("Cancel the SeaTunnelSourceFunction of Flink.");
internalSource.close();
} catch (Exception e) {
throw new RuntimeException(e);