This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d71337b0e9 [Feature][Core] Add event notify for all connector (#7501)
d71337b0e9 is described below

commit d71337b0e9931db66076be5fcda49db00a31486a
Author: Jast <[email protected]>
AuthorDate: Wed Aug 28 10:52:32 2024 +0800

    [Feature][Core] Add event notify for all connector (#7501)
    
    * [feature]add event notify
    
    * [feature]add event notify
    
    * [fixbug]fix some problem
    
    * [feature]fix some problem
    
    * [feature]fix some problem
---
 .../api/sink/multitablesink/MultiTableSink.java    |  9 ++++-
 .../sink/multitablesink/MultiTableSinkWriter.java  | 46 ++++++++++++++--------
 .../seatunnel/console/sink/ConsoleSinkWriter.java  |  5 +--
 .../seatunnel/fake/source/FakeSourceReader.java    | 10 +----
 .../fake/source/FakeSourceSplitEnumerator.java     | 10 +----
 .../FakeSourceToConsoleWithEventReportIT.java      |  2 +-
 .../server/task/SourceSplitEnumeratorTask.java     |  4 ++
 .../engine/server/task/flow/SinkFlowLifeCycle.java | 17 +++-----
 .../server/task/flow/SourceFlowLifeCycle.java      | 23 ++++++-----
 .../translation/source/CoordinatedSource.java      | 11 ++++++
 .../translation/source/ParallelSource.java         |  8 ++++
 .../translation/flink/sink/FlinkSink.java          |  7 +---
 .../translation/flink/sink/FlinkSinkWriter.java    |  8 +++-
 .../flink/source/FlinkSourceEnumerator.java        |  6 +++
 .../flink/source/FlinkSourceReader.java            |  4 ++
 .../spark/sink/writer/SparkDataWriter.java         |  7 +++-
 .../spark/sink/writer/SparkDataWriterFactory.java  |  2 +-
 .../spark/sink/write/SeaTunnelSparkDataWriter.java |  7 +++-
 .../write/SeaTunnelSparkDataWriterFactory.java     |  2 +-
 19 files changed, 118 insertions(+), 70 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 923ecff8b8..3f7f7fa9c6 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -64,6 +64,7 @@ public class MultiTableSink
     public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> 
createWriter(
             SinkWriter.Context context) throws IOException {
         Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new 
HashMap<>();
+        Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new 
HashMap<>();
         for (int i = 0; i < replicaNum; i++) {
             for (String tableIdentifier : sinks.keySet()) {
                 SeaTunnelSink sink = sinks.get(tableIdentifier);
@@ -71,15 +72,18 @@ public class MultiTableSink
                 writers.put(
                         SinkIdentifier.of(tableIdentifier, index),
                         sink.createWriter(new SinkContextProxy(index, 
context)));
+                sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, 
index), context);
             }
         }
-        return new MultiTableSinkWriter(writers, replicaNum);
+        return new MultiTableSinkWriter(writers, replicaNum, 
sinkWritersContext);
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> 
restoreWriter(
             SinkWriter.Context context, List<MultiTableState> states) throws 
IOException {
         Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new 
HashMap<>();
+        Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new 
HashMap<>();
+
         for (int i = 0; i < replicaNum; i++) {
             for (String tableIdentifier : sinks.keySet()) {
                 SeaTunnelSink sink = sinks.get(tableIdentifier);
@@ -102,9 +106,10 @@ public class MultiTableSink
                             sinkIdentifier,
                             sink.restoreWriter(new SinkContextProxy(index, 
context), state));
                 }
+                sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, 
index), context);
             }
         }
-        return new MultiTableSinkWriter(writers, replicaNum);
+        return new MultiTableSinkWriter(writers, replicaNum, 
sinkWritersContext);
     }
 
     @Override
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 3c73435faf..38234e220c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.api.sink.multitablesink;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
@@ -45,6 +46,7 @@ public class MultiTableSinkWriter
         implements SinkWriter<SeaTunnelRow, MultiTableCommitInfo, 
MultiTableState> {
 
     private final Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkWriters;
+    private final Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext;
     private final Map<String, Optional<Integer>> sinkPrimaryKeys = new 
HashMap<>();
     private final List<Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>> 
sinkWritersWithIndex;
     private final List<MultiTableWriterRunnable> runnable = new ArrayList<>();
@@ -55,8 +57,11 @@ public class MultiTableSinkWriter
     private volatile boolean submitted = false;
 
     public MultiTableSinkWriter(
-            Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters, 
int queueSize) {
+            Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters,
+            int queueSize,
+            Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext) {
         this.sinkWriters = sinkWriters;
+        this.sinkWritersContext = sinkWritersContext;
         AtomicInteger cnt = new AtomicInteger(0);
         executorService =
                 Executors.newFixedThreadPool(
@@ -84,6 +89,7 @@ public class MultiTableSinkWriter
                                         entry.getKey().getTableIdentifier(), 
entry.getValue());
                                 sinkIdentifierMap.put(entry.getKey(), 
entry.getValue());
                             });
+
             sinkWritersWithIndex.add(sinkIdentifierMap);
             blockingQueues.add(queue);
             MultiTableWriterRunnable r = new 
MultiTableWriterRunnable(tableIdWriterMap, queue);
@@ -267,26 +273,34 @@ public class MultiTableSinkWriter
 
     @Override
     public void close() throws IOException {
-        Throwable firstE = null;
+        // The variables used in lambda expressions should be final or valid 
final, so they are
+        // modified to arrays
+        final Throwable[] firstE = {null};
         try {
             checkQueueRemain();
         } catch (Exception e) {
-            firstE = e;
+            firstE[0] = e;
         }
         executorService.shutdownNow();
         for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
             synchronized (runnable.get(i)) {
-                for (SinkWriter<SeaTunnelRow, ?, ?> sinkWriter :
-                        sinkWritersWithIndex.get(i).values()) {
-                    try {
-                        sinkWriter.close();
-                    } catch (Throwable e) {
-                        if (firstE == null) {
-                            firstE = e;
-                        }
-                        log.error("close error", e);
-                    }
-                }
+                Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkIdentifierSinkWriterMap =
+                        sinkWritersWithIndex.get(i);
+                sinkIdentifierSinkWriterMap.forEach(
+                        (identifier, sinkWriter) -> {
+                            try {
+                                sinkWriter.close();
+                                sinkWritersContext
+                                        .get(identifier)
+                                        .getEventListener()
+                                        .onEvent(new WriterCloseEvent());
+                            } catch (Throwable e) {
+                                if (firstE[0] == null) {
+                                    firstE[0] = e;
+                                }
+                                log.error("close error", e);
+                            }
+                        });
             }
         }
         try {
@@ -296,8 +310,8 @@ public class MultiTableSinkWriter
         } catch (Throwable e) {
             log.error("close resourceManager error", e);
         }
-        if (firstE != null) {
-            throw new RuntimeException(firstE);
+        if (firstE[0] != null) {
+            throw new RuntimeException(firstE[0]);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 4c9e6f4760..d83e8b5c96 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.console.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
-import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import 
org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
 import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
@@ -99,9 +98,7 @@ public class ConsoleSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     }
 
     @Override
-    public void close() {
-        context.getEventListener().onEvent(new WriterCloseEvent());
-    }
+    public void close() {}
 
     private String fieldsInfo(SeaTunnelRowType seaTunnelRowType) {
         String[] fieldsInfo = new String[seaTunnelRowType.getTotalFields()];
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 95758cb971..063ece63d2 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
-import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
@@ -73,14 +71,10 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
     }
 
     @Override
-    public void open() {
-        context.getEventListener().onEvent(new ReaderOpenEvent());
-    }
+    public void open() {}
 
     @Override
-    public void close() {
-        context.getEventListener().onEvent(new ReaderCloseEvent());
-    }
+    public void close() {}
 
     @Override
     @SuppressWarnings("MagicNumber")
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
index ecd6d50914..311e818376 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -18,8 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
-import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
 import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
@@ -58,9 +56,7 @@ public class FakeSourceSplitEnumerator
     }
 
     @Override
-    public void open() {
-        enumeratorContext.getEventListener().onEvent(new 
EnumeratorOpenEvent());
-    }
+    public void open() {}
 
     @Override
     public void run() throws Exception {
@@ -69,9 +65,7 @@ public class FakeSourceSplitEnumerator
     }
 
     @Override
-    public void close() throws IOException {
-        enumeratorContext.getEventListener().onEvent(new 
EnumeratorCloseEvent());
-    }
+    public void close() throws IOException {}
 
     @Override
     public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
index 8389cb3c05..8e45bbf9de 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
@@ -109,7 +109,7 @@ public class FakeSourceToConsoleWithEventReportIT extends 
SeaTunnelContainer {
                 arrayNode.elements().forEachRemaining(jsonNode -> 
events.add(jsonNode));
             }
         }
-        Assertions.assertEquals(8, events.size());
+        Assertions.assertEquals(10, events.size());
         Set<String> eventTypes =
                 events.stream().map(e -> 
e.get("eventType").asText()).collect(Collectors.toSet());
         Assertions.assertTrue(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index d1fc333ade..8004068ce6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -121,6 +123,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         super.close();
         if (enumerator != null) {
             enumerator.close();
+            enumeratorContext.getEventListener().onEvent(new 
EnumeratorCloseEvent());
         }
         progress.done();
     }
@@ -309,6 +312,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 if (startCalled && readerRegisterComplete) {
                     currState = STARTING;
                     enumerator.open();
+                    enumeratorContext.getEventListener().onEvent(new 
EnumeratorOpenEvent());
                 } else {
                     Thread.sleep(100);
                 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index cacaa75aae..3234560fe4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
@@ -69,6 +70,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> 
sinkAction;
     private SinkWriter<T, CommitInfoT, StateT> writer;
+    private SinkWriter.Context writerContext;
 
     private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
     private transient Optional<Serializer<StateT>> writerStateSerializer;
@@ -150,6 +152,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
     public void close() throws IOException {
         super.close();
         writer.close();
+        writerContext.getEventListener().onEvent(new WriterCloseEvent());
         try {
             if (resourceManager != null) {
                 resourceManager.close();
@@ -283,19 +286,11 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                                                                     
.deserialize(bytes)))
                             .collect(Collectors.toList());
         }
+        this.writerContext = new SinkWriterContext(indexID, metricsContext, 
eventListener);
         if (states.isEmpty()) {
-            this.writer =
-                    sinkAction
-                            .getSink()
-                            .createWriter(
-                                    new SinkWriterContext(indexID, 
metricsContext, eventListener));
+            this.writer = sinkAction.getSink().createWriter(writerContext);
         } else {
-            this.writer =
-                    sinkAction
-                            .getSink()
-                            .restoreWriter(
-                                    new SinkWriterContext(indexID, 
metricsContext, eventListener),
-                                    states);
+            this.writer = sinkAction.getSink().restoreWriter(writerContext, 
states);
         }
         if (this.writer instanceof SupportResourceShare) {
             resourceManager =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index ca137b3e06..6c596da0c3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
@@ -83,6 +85,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
 
     private final MetricsContext metricsContext;
     private final EventListener eventListener;
+    private SourceReader.Context context;
 
     private final AtomicReference<SchemaChangePhase> schemaChangePhase = new 
AtomicReference<>();
 
@@ -111,21 +114,20 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
     @Override
     public void init() throws Exception {
         this.splitSerializer = sourceAction.getSource().getSplitSerializer();
-        this.reader =
-                sourceAction
-                        .getSource()
-                        .createReader(
-                                new SourceReaderContext(
-                                        indexID,
-                                        
sourceAction.getSource().getBoundedness(),
-                                        this,
-                                        metricsContext,
-                                        eventListener));
+        this.context =
+                new SourceReaderContext(
+                        indexID,
+                        sourceAction.getSource().getBoundedness(),
+                        this,
+                        metricsContext,
+                        eventListener);
+        this.reader = sourceAction.getSource().createReader(context);
         this.enumeratorTaskAddress = getEnumeratorTaskAddress();
     }
 
     @Override
     public void open() throws Exception {
+        context.getEventListener().onEvent(new ReaderOpenEvent());
         reader.open();
         register();
     }
@@ -140,6 +142,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
 
     @Override
     public void close() throws IOException {
+        context.getEventListener().onEvent(new ReaderCloseEvent());
         reader.close();
         super.close();
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 11b240dd99..4e5d864369 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -24,6 +24,10 @@ import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
 import lombok.extern.slf4j.Slf4j;
@@ -136,6 +140,7 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
                 ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(
                         parallelism, "parallel-split-enumerator-executor");
         splitEnumerator.open();
+        coordinatedEnumeratorContext.getEventListener().onEvent(new 
EnumeratorOpenEvent());
         restoredSplitStateMap.forEach(
                 (subtaskId, splits) -> {
                     splitEnumerator.addSplitsBack(splits, subtaskId);
@@ -147,6 +152,10 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
                         entry -> {
                             try {
                                 entry.getValue().open();
+                                readerContextMap
+                                        .get(entry.getKey())
+                                        .getEventListener()
+                                        .onEvent(new ReaderOpenEvent());
                                 splitEnumerator.registerReader(entry.getKey());
                             } catch (Exception e) {
                                 throw new RuntimeException(e);
@@ -203,6 +212,7 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
         for (Map.Entry<Integer, SourceReader<T, SplitT>> entry : 
readerMap.entrySet()) {
             readerRunningMap.get(entry.getKey()).set(false);
             entry.getValue().close();
+            
readerContextMap.get(entry.getKey()).getEventListener().onEvent(new 
ReaderCloseEvent());
         }
 
         if (executorService != null) {
@@ -211,6 +221,7 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
 
         try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator) {
             // just close the resources
+            coordinatedEnumeratorContext.getEventListener().onEvent(new 
EnumeratorCloseEvent());
         }
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 4cc1bfd141..ed794a5b6c 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -23,6 +23,10 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
 import org.slf4j.Logger;
@@ -115,7 +119,9 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT extends Serial
             splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
         }
         reader.open();
+        readerContext.getEventListener().onEvent(new ReaderOpenEvent());
         parallelEnumeratorContext.register();
+        parallelEnumeratorContext.getEventListener().onEvent(new 
EnumeratorOpenEvent());
         splitEnumerator.registerReader(subtaskId);
     }
 
@@ -170,6 +176,8 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT extends Serial
         if (reader != null) {
             LOG.debug("Close the data reader for the Apache SeaTunnel 
source.");
             reader.close();
+            readerContext.getEventListener().onEvent(new ReaderCloseEvent());
+            parallelEnumeratorContext.getEventListener().onEvent(new 
EnumeratorCloseEvent());
         }
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 4a720e347b..2ebbcba4f9 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -66,10 +66,7 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT>
 
         if (states == null || states.isEmpty()) {
             return new FlinkSinkWriter<>(
-                    sink.createWriter(stContext),
-                    1,
-                    catalogTable.getSeaTunnelRowType(),
-                    stContext.getMetricsContext());
+                    sink.createWriter(stContext), 1, 
catalogTable.getSeaTunnelRowType(), stContext);
         } else {
             List<WriterStateT> restoredState =
                     
states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
@@ -77,7 +74,7 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT>
                     sink.restoreWriter(stContext, restoredState),
                     states.get(0).getCheckpointId() + 1,
                     catalogTable.getSeaTunnelRowType(),
-                    stContext.getMetricsContext());
+                    stContext);
         }
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 725bf606f9..8de831aee1 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.metrics.MetricNames;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
@@ -53,6 +54,8 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
     private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, 
CommT, WriterStateT>
             sinkWriter;
 
+    private final org.apache.seatunnel.api.sink.SinkWriter.Context context;
+
     private final Counter sinkWriteCount;
 
     private final Counter sinkWriteBytes;
@@ -67,9 +70,11 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
             org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, 
WriterStateT> sinkWriter,
             long checkpointId,
             SeaTunnelDataType<?> dataType,
-            MetricsContext metricsContext) {
+            org.apache.seatunnel.api.sink.SinkWriter.Context context) {
+        this.context = context;
         this.sinkWriter = sinkWriter;
         this.checkpointId = checkpointId;
+        MetricsContext metricsContext = context.getMetricsContext();
         this.sinkWriteCount = 
metricsContext.counter(MetricNames.SINK_WRITE_COUNT);
         this.sinkWriteBytes = 
metricsContext.counter(MetricNames.SINK_WRITE_BYTES);
         this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS);
@@ -118,6 +123,7 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
     @Override
     public void close() throws Exception {
         sinkWriter.close();
+        context.getEventListener().onEvent(new WriterCloseEvent());
         try {
             if (resourceManager != null) {
                 resourceManager.close();
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
index e457d69f27..7d8052bfd1 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.translation.flink.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -49,6 +51,7 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
 
     private final SplitEnumeratorContext<SplitWrapper<SplitT>> 
enumeratorContext;
 
+    private final SourceSplitEnumerator.Context<SplitT> context;
     private final int parallelism;
 
     private final Object lock = new Object();
@@ -62,12 +65,14 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
             SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
         this.sourceSplitEnumerator = enumerator;
         this.enumeratorContext = enumContext;
+        this.context = new 
FlinkSourceSplitEnumeratorContext<>(enumeratorContext);
         this.parallelism = enumeratorContext.currentParallelism();
     }
 
     @Override
     public void start() {
         sourceSplitEnumerator.open();
+        context.getEventListener().onEvent(new EnumeratorOpenEvent());
     }
 
     @Override
@@ -106,6 +111,7 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
     @Override
     public void close() throws IOException {
         sourceSplitEnumerator.close();
+        context.getEventListener().onEvent(new EnumeratorCloseEvent());
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index c2f9cde500..fb1dc85174 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.translation.flink.source;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.flink.api.connector.source.ReaderOutput;
@@ -66,6 +68,7 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
     public void start() {
         try {
             sourceReader.open();
+            context.getEventListener().onEvent(new ReaderOpenEvent());
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -121,6 +124,7 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
     @Override
     public void close() throws Exception {
         sourceReader.close();
+        context.getEventListener().onEvent(new ReaderCloseEvent());
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
index 434b1ef979..a9eac50062 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
 
@@ -47,16 +48,19 @@ public class SparkDataWriter<CommitInfoT, StateT> 
implements DataWriter<Internal
     protected volatile MultiTableResourceManager resourceManager;
 
     private final MultiTableManager multiTableManager;
+    private final org.apache.seatunnel.api.sink.SinkWriter.Context context;
 
     SparkDataWriter(
             SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
             @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
             MultiTableManager multiTableManager,
-            long epochId) {
+            long epochId,
+            org.apache.seatunnel.api.sink.SinkWriter.Context context) {
         this.sinkWriter = sinkWriter;
         this.sinkCommitter = sinkCommitter;
         this.epochId = epochId == 0 ? 1 : epochId;
         this.multiTableManager = multiTableManager;
+        this.context = context;
         initResourceManger();
     }
 
@@ -97,6 +101,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements 
DataWriter<Internal
                 new SparkWriterCommitMessage<>(latestCommitInfoT);
         cleanCommitInfo();
         sinkWriter.close();
+        context.getEventListener().onEvent(new WriterCloseEvent());
         try {
             if (resourceManager != null) {
                 resourceManager.close();
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
index 3a646f3aca..b684654103 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
@@ -63,6 +63,6 @@ public class SparkDataWriterFactory<CommitInfoT, StateT> 
implements DataWriterFa
             throw new RuntimeException("Failed to create SinkCommitter.", e);
         }
         return new SparkDataWriter<>(
-                writer, committer, new MultiTableManager(catalogTables), 
epochId);
+                writer, committer, new MultiTableManager(catalogTables), 
epochId, context);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
index 59f931e38f..c2c24aa914 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
 
@@ -47,16 +48,19 @@ public class SeaTunnelSparkDataWriter<CommitInfoT, StateT> 
implements DataWriter
     protected volatile MultiTableResourceManager resourceManager;
 
     private final MultiTableManager multiTableManager;
+    private final SinkWriter.Context context;
 
     public SeaTunnelSparkDataWriter(
             SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
             @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
             MultiTableManager multiTableManager,
-            long epochId) {
+            long epochId,
+            SinkWriter.Context context) {
         this.sinkWriter = sinkWriter;
         this.sinkCommitter = sinkCommitter;
         this.multiTableManager = multiTableManager;
         this.epochId = epochId == 0 ? 1 : epochId;
+        this.context = context;
         initResourceManger();
     }
 
@@ -89,6 +93,7 @@ public class SeaTunnelSparkDataWriter<CommitInfoT, StateT> 
implements DataWriter
                 new SeaTunnelSparkWriterCommitMessage<>(latestCommitInfoT);
         cleanCommitInfo();
         sinkWriter.close();
+        context.getEventListener().onEvent(new WriterCloseEvent());
         try {
             if (resourceManager != null) {
                 resourceManager.close();
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
index b83787cac1..255a9cd339 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
@@ -64,7 +64,7 @@ public class SeaTunnelSparkDataWriterFactory<CommitInfoT, 
StateT>
             throw new RuntimeException("Failed to create SinkCommitter.", e);
         }
         return new SeaTunnelSparkDataWriter<>(
-                writer, committer, new MultiTableManager(catalogTables), 0);
+                writer, committer, new MultiTableManager(catalogTables), 0, 
context);
     }
 
     @Override


Reply via email to