This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d71337b0e9 [Feature][Core] Add event notify for all connector (#7501)
d71337b0e9 is described below
commit d71337b0e9931db66076be5fcda49db00a31486a
Author: Jast <[email protected]>
AuthorDate: Wed Aug 28 10:52:32 2024 +0800
[Feature][Core] Add event notify for all connector (#7501)
* [feature]add event notify
* [feature]add event notify
* [fixbug]fix some problem
* [feature]fix some problem
* [feature]fix some problem
---
.../api/sink/multitablesink/MultiTableSink.java | 9 ++++-
.../sink/multitablesink/MultiTableSinkWriter.java | 46 ++++++++++++++--------
.../seatunnel/console/sink/ConsoleSinkWriter.java | 5 +--
.../seatunnel/fake/source/FakeSourceReader.java | 10 +----
.../fake/source/FakeSourceSplitEnumerator.java | 10 +----
.../FakeSourceToConsoleWithEventReportIT.java | 2 +-
.../server/task/SourceSplitEnumeratorTask.java | 4 ++
.../engine/server/task/flow/SinkFlowLifeCycle.java | 17 +++-----
.../server/task/flow/SourceFlowLifeCycle.java | 23 ++++++-----
.../translation/source/CoordinatedSource.java | 11 ++++++
.../translation/source/ParallelSource.java | 8 ++++
.../translation/flink/sink/FlinkSink.java | 7 +---
.../translation/flink/sink/FlinkSinkWriter.java | 8 +++-
.../flink/source/FlinkSourceEnumerator.java | 6 +++
.../flink/source/FlinkSourceReader.java | 4 ++
.../spark/sink/writer/SparkDataWriter.java | 7 +++-
.../spark/sink/writer/SparkDataWriterFactory.java | 2 +-
.../spark/sink/write/SeaTunnelSparkDataWriter.java | 7 +++-
.../write/SeaTunnelSparkDataWriterFactory.java | 2 +-
19 files changed, 118 insertions(+), 70 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 923ecff8b8..3f7f7fa9c6 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -64,6 +64,7 @@ public class MultiTableSink
public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState>
createWriter(
SinkWriter.Context context) throws IOException {
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new
HashMap<>();
+ Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new
HashMap<>();
for (int i = 0; i < replicaNum; i++) {
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
@@ -71,15 +72,18 @@ public class MultiTableSink
writers.put(
SinkIdentifier.of(tableIdentifier, index),
sink.createWriter(new SinkContextProxy(index,
context)));
+ sinkWritersContext.put(SinkIdentifier.of(tableIdentifier,
index), context);
}
}
- return new MultiTableSinkWriter(writers, replicaNum);
+ return new MultiTableSinkWriter(writers, replicaNum,
sinkWritersContext);
}
@Override
public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState>
restoreWriter(
SinkWriter.Context context, List<MultiTableState> states) throws
IOException {
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new
HashMap<>();
+ Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new
HashMap<>();
+
for (int i = 0; i < replicaNum; i++) {
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
@@ -102,9 +106,10 @@ public class MultiTableSink
sinkIdentifier,
sink.restoreWriter(new SinkContextProxy(index,
context), state));
}
+ sinkWritersContext.put(SinkIdentifier.of(tableIdentifier,
index), context);
}
}
- return new MultiTableSinkWriter(writers, replicaNum);
+ return new MultiTableSinkWriter(writers, replicaNum,
sinkWritersContext);
}
@Override
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 3c73435faf..38234e220c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -45,6 +46,7 @@ public class MultiTableSinkWriter
implements SinkWriter<SeaTunnelRow, MultiTableCommitInfo,
MultiTableState> {
private final Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkWriters;
+ private final Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext;
private final Map<String, Optional<Integer>> sinkPrimaryKeys = new
HashMap<>();
private final List<Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>>
sinkWritersWithIndex;
private final List<MultiTableWriterRunnable> runnable = new ArrayList<>();
@@ -55,8 +57,11 @@ public class MultiTableSinkWriter
private volatile boolean submitted = false;
public MultiTableSinkWriter(
- Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters,
int queueSize) {
+ Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriters,
+ int queueSize,
+ Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext) {
this.sinkWriters = sinkWriters;
+ this.sinkWritersContext = sinkWritersContext;
AtomicInteger cnt = new AtomicInteger(0);
executorService =
Executors.newFixedThreadPool(
@@ -84,6 +89,7 @@ public class MultiTableSinkWriter
entry.getKey().getTableIdentifier(),
entry.getValue());
sinkIdentifierMap.put(entry.getKey(),
entry.getValue());
});
+
sinkWritersWithIndex.add(sinkIdentifierMap);
blockingQueues.add(queue);
MultiTableWriterRunnable r = new
MultiTableWriterRunnable(tableIdWriterMap, queue);
@@ -267,26 +273,34 @@ public class MultiTableSinkWriter
@Override
public void close() throws IOException {
- Throwable firstE = null;
+ // The variables used in lambda expressions should be final or valid
final, so they are
+ // modified to arrays
+ final Throwable[] firstE = {null};
try {
checkQueueRemain();
} catch (Exception e) {
- firstE = e;
+ firstE[0] = e;
}
executorService.shutdownNow();
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
synchronized (runnable.get(i)) {
- for (SinkWriter<SeaTunnelRow, ?, ?> sinkWriter :
- sinkWritersWithIndex.get(i).values()) {
- try {
- sinkWriter.close();
- } catch (Throwable e) {
- if (firstE == null) {
- firstE = e;
- }
- log.error("close error", e);
- }
- }
+ Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkIdentifierSinkWriterMap =
+ sinkWritersWithIndex.get(i);
+ sinkIdentifierSinkWriterMap.forEach(
+ (identifier, sinkWriter) -> {
+ try {
+ sinkWriter.close();
+ sinkWritersContext
+ .get(identifier)
+ .getEventListener()
+ .onEvent(new WriterCloseEvent());
+ } catch (Throwable e) {
+ if (firstE[0] == null) {
+ firstE[0] = e;
+ }
+ log.error("close error", e);
+ }
+ });
}
}
try {
@@ -296,8 +310,8 @@ public class MultiTableSinkWriter
} catch (Throwable e) {
log.error("close resourceManager error", e);
}
- if (firstE != null) {
- throw new RuntimeException(firstE);
+ if (firstE[0] != null) {
+ throw new RuntimeException(firstE[0]);
}
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 4c9e6f4760..d83e8b5c96 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.console.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
-import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import
org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
@@ -99,9 +98,7 @@ public class ConsoleSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
}
@Override
- public void close() {
- context.getEventListener().onEvent(new WriterCloseEvent());
- }
+ public void close() {}
private String fieldsInfo(SeaTunnelRowType seaTunnelRowType) {
String[] fieldsInfo = new String[seaTunnelRowType.getTotalFields()];
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 95758cb971..063ece63d2 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -20,8 +20,6 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
-import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
@@ -73,14 +71,10 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
}
@Override
- public void open() {
- context.getEventListener().onEvent(new ReaderOpenEvent());
- }
+ public void open() {}
@Override
- public void close() {
- context.getEventListener().onEvent(new ReaderCloseEvent());
- }
+ public void close() {}
@Override
@SuppressWarnings("MagicNumber")
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
index ecd6d50914..311e818376 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -18,8 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
-import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
@@ -58,9 +56,7 @@ public class FakeSourceSplitEnumerator
}
@Override
- public void open() {
- enumeratorContext.getEventListener().onEvent(new
EnumeratorOpenEvent());
- }
+ public void open() {}
@Override
public void run() throws Exception {
@@ -69,9 +65,7 @@ public class FakeSourceSplitEnumerator
}
@Override
- public void close() throws IOException {
- enumeratorContext.getEventListener().onEvent(new
EnumeratorCloseEvent());
- }
+ public void close() throws IOException {}
@Override
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
index 8389cb3c05..8e45bbf9de 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java
@@ -109,7 +109,7 @@ public class FakeSourceToConsoleWithEventReportIT extends
SeaTunnelContainer {
arrayNode.elements().forEachRemaining(jsonNode ->
events.add(jsonNode));
}
}
- Assertions.assertEquals(8, events.size());
+ Assertions.assertEquals(10, events.size());
Set<String> eventTypes =
events.stream().map(e ->
e.get("eventType").asText()).collect(Collectors.toSet());
Assertions.assertTrue(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index d1fc333ade..8004068ce6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -121,6 +123,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
super.close();
if (enumerator != null) {
enumerator.close();
+ enumeratorContext.getEventListener().onEvent(new
EnumeratorCloseEvent());
}
progress.done();
}
@@ -309,6 +312,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
if (startCalled && readerRegisterComplete) {
currState = STARTING;
enumerator.open();
+ enumeratorContext.getEventListener().onEvent(new
EnumeratorOpenEvent());
} else {
Thread.sleep(100);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index cacaa75aae..3234560fe4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
@@ -69,6 +70,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT>
sinkAction;
private SinkWriter<T, CommitInfoT, StateT> writer;
+ private SinkWriter.Context writerContext;
private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
private transient Optional<Serializer<StateT>> writerStateSerializer;
@@ -150,6 +152,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
public void close() throws IOException {
super.close();
writer.close();
+ writerContext.getEventListener().onEvent(new WriterCloseEvent());
try {
if (resourceManager != null) {
resourceManager.close();
@@ -283,19 +286,11 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
.deserialize(bytes)))
.collect(Collectors.toList());
}
+ this.writerContext = new SinkWriterContext(indexID, metricsContext,
eventListener);
if (states.isEmpty()) {
- this.writer =
- sinkAction
- .getSink()
- .createWriter(
- new SinkWriterContext(indexID,
metricsContext, eventListener));
+ this.writer = sinkAction.getSink().createWriter(writerContext);
} else {
- this.writer =
- sinkAction
- .getSink()
- .restoreWriter(
- new SinkWriterContext(indexID,
metricsContext, eventListener),
- states);
+ this.writer = sinkAction.getSink().restoreWriter(writerContext,
states);
}
if (this.writer instanceof SupportResourceShare) {
resourceManager =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index ca137b3e06..6c596da0c3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
@@ -83,6 +85,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
private final MetricsContext metricsContext;
private final EventListener eventListener;
+ private SourceReader.Context context;
private final AtomicReference<SchemaChangePhase> schemaChangePhase = new
AtomicReference<>();
@@ -111,21 +114,20 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
@Override
public void init() throws Exception {
this.splitSerializer = sourceAction.getSource().getSplitSerializer();
- this.reader =
- sourceAction
- .getSource()
- .createReader(
- new SourceReaderContext(
- indexID,
-
sourceAction.getSource().getBoundedness(),
- this,
- metricsContext,
- eventListener));
+ this.context =
+ new SourceReaderContext(
+ indexID,
+ sourceAction.getSource().getBoundedness(),
+ this,
+ metricsContext,
+ eventListener);
+ this.reader = sourceAction.getSource().createReader(context);
this.enumeratorTaskAddress = getEnumeratorTaskAddress();
}
@Override
public void open() throws Exception {
+ context.getEventListener().onEvent(new ReaderOpenEvent());
reader.open();
register();
}
@@ -140,6 +142,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
@Override
public void close() throws IOException {
+ context.getEventListener().onEvent(new ReaderCloseEvent());
reader.close();
super.close();
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 11b240dd99..4e5d864369 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -24,6 +24,10 @@ import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import lombok.extern.slf4j.Slf4j;
@@ -136,6 +140,7 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(
parallelism, "parallel-split-enumerator-executor");
splitEnumerator.open();
+ coordinatedEnumeratorContext.getEventListener().onEvent(new
EnumeratorOpenEvent());
restoredSplitStateMap.forEach(
(subtaskId, splits) -> {
splitEnumerator.addSplitsBack(splits, subtaskId);
@@ -147,6 +152,10 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
entry -> {
try {
entry.getValue().open();
+ readerContextMap
+ .get(entry.getKey())
+ .getEventListener()
+ .onEvent(new ReaderOpenEvent());
splitEnumerator.registerReader(entry.getKey());
} catch (Exception e) {
throw new RuntimeException(e);
@@ -203,6 +212,7 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
for (Map.Entry<Integer, SourceReader<T, SplitT>> entry :
readerMap.entrySet()) {
readerRunningMap.get(entry.getKey()).set(false);
entry.getValue().close();
+
readerContextMap.get(entry.getKey()).getEventListener().onEvent(new
ReaderCloseEvent());
}
if (executorService != null) {
@@ -211,6 +221,7 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator) {
// just close the resources
+ coordinatedEnumeratorContext.getEventListener().onEvent(new
EnumeratorCloseEvent());
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 4cc1bfd141..ed794a5b6c 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -23,6 +23,10 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.slf4j.Logger;
@@ -115,7 +119,9 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT extends Serial
splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
}
reader.open();
+ readerContext.getEventListener().onEvent(new ReaderOpenEvent());
parallelEnumeratorContext.register();
+ parallelEnumeratorContext.getEventListener().onEvent(new
EnumeratorOpenEvent());
splitEnumerator.registerReader(subtaskId);
}
@@ -170,6 +176,8 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT extends Serial
if (reader != null) {
LOG.debug("Close the data reader for the Apache SeaTunnel
source.");
reader.close();
+ readerContext.getEventListener().onEvent(new ReaderCloseEvent());
+ parallelEnumeratorContext.getEventListener().onEvent(new
EnumeratorCloseEvent());
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 4a720e347b..2ebbcba4f9 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -66,10 +66,7 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
if (states == null || states.isEmpty()) {
return new FlinkSinkWriter<>(
- sink.createWriter(stContext),
- 1,
- catalogTable.getSeaTunnelRowType(),
- stContext.getMetricsContext());
+ sink.createWriter(stContext), 1,
catalogTable.getSeaTunnelRowType(), stContext);
} else {
List<WriterStateT> restoredState =
states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
@@ -77,7 +74,7 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
sink.restoreWriter(stContext, restoredState),
states.get(0).getCheckpointId() + 1,
catalogTable.getSeaTunnelRowType(),
- stContext.getMetricsContext());
+ stContext);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 725bf606f9..8de831aee1 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.metrics.MetricNames;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -53,6 +54,8 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow,
CommT, WriterStateT>
sinkWriter;
+ private final org.apache.seatunnel.api.sink.SinkWriter.Context context;
+
private final Counter sinkWriteCount;
private final Counter sinkWriteBytes;
@@ -67,9 +70,11 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT,
WriterStateT> sinkWriter,
long checkpointId,
SeaTunnelDataType<?> dataType,
- MetricsContext metricsContext) {
+ org.apache.seatunnel.api.sink.SinkWriter.Context context) {
+ this.context = context;
this.sinkWriter = sinkWriter;
this.checkpointId = checkpointId;
+ MetricsContext metricsContext = context.getMetricsContext();
this.sinkWriteCount =
metricsContext.counter(MetricNames.SINK_WRITE_COUNT);
this.sinkWriteBytes =
metricsContext.counter(MetricNames.SINK_WRITE_BYTES);
this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS);
@@ -118,6 +123,7 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
@Override
public void close() throws Exception {
sinkWriter.close();
+ context.getEventListener().onEvent(new WriterCloseEvent());
try {
if (resourceManager != null) {
resourceManager.close();
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
index e457d69f27..7d8052bfd1 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.translation.flink.source;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.event.EnumeratorCloseEvent;
+import org.apache.seatunnel.api.source.event.EnumeratorOpenEvent;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -49,6 +51,7 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
private final SplitEnumeratorContext<SplitWrapper<SplitT>>
enumeratorContext;
+ private final SourceSplitEnumerator.Context<SplitT> context;
private final int parallelism;
private final Object lock = new Object();
@@ -62,12 +65,14 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
this.sourceSplitEnumerator = enumerator;
this.enumeratorContext = enumContext;
+ this.context = new
FlinkSourceSplitEnumeratorContext<>(enumeratorContext);
this.parallelism = enumeratorContext.currentParallelism();
}
@Override
public void start() {
sourceSplitEnumerator.open();
+ context.getEventListener().onEvent(new EnumeratorOpenEvent());
}
@Override
@@ -106,6 +111,7 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
@Override
public void close() throws IOException {
sourceSplitEnumerator.close();
+ context.getEventListener().onEvent(new EnumeratorCloseEvent());
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index c2f9cde500..fb1dc85174 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.translation.flink.source;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
+import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.flink.api.connector.source.ReaderOutput;
@@ -66,6 +68,7 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
public void start() {
try {
sourceReader.open();
+ context.getEventListener().onEvent(new ReaderOpenEvent());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -121,6 +124,7 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
@Override
public void close() throws Exception {
sourceReader.close();
+ context.getEventListener().onEvent(new ReaderCloseEvent());
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
index 434b1ef979..a9eac50062 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
@@ -47,16 +48,19 @@ public class SparkDataWriter<CommitInfoT, StateT>
implements DataWriter<Internal
protected volatile MultiTableResourceManager resourceManager;
private final MultiTableManager multiTableManager;
+ private final org.apache.seatunnel.api.sink.SinkWriter.Context context;
SparkDataWriter(
SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
@Nullable SinkCommitter<CommitInfoT> sinkCommitter,
MultiTableManager multiTableManager,
- long epochId) {
+ long epochId,
+ org.apache.seatunnel.api.sink.SinkWriter.Context context) {
this.sinkWriter = sinkWriter;
this.sinkCommitter = sinkCommitter;
this.epochId = epochId == 0 ? 1 : epochId;
this.multiTableManager = multiTableManager;
+ this.context = context;
initResourceManger();
}
@@ -97,6 +101,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements
DataWriter<Internal
new SparkWriterCommitMessage<>(latestCommitInfoT);
cleanCommitInfo();
sinkWriter.close();
+ context.getEventListener().onEvent(new WriterCloseEvent());
try {
if (resourceManager != null) {
resourceManager.close();
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
index 3a646f3aca..b684654103 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
@@ -63,6 +63,6 @@ public class SparkDataWriterFactory<CommitInfoT, StateT>
implements DataWriterFa
throw new RuntimeException("Failed to create SinkCommitter.", e);
}
return new SparkDataWriter<>(
- writer, committer, new MultiTableManager(catalogTables),
epochId);
+ writer, committer, new MultiTableManager(catalogTables),
epochId, context);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
index 59f931e38f..c2c24aa914 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
@@ -47,16 +48,19 @@ public class SeaTunnelSparkDataWriter<CommitInfoT, StateT>
implements DataWriter
protected volatile MultiTableResourceManager resourceManager;
private final MultiTableManager multiTableManager;
+ private final SinkWriter.Context context;
public SeaTunnelSparkDataWriter(
SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
@Nullable SinkCommitter<CommitInfoT> sinkCommitter,
MultiTableManager multiTableManager,
- long epochId) {
+ long epochId,
+ SinkWriter.Context context) {
this.sinkWriter = sinkWriter;
this.sinkCommitter = sinkCommitter;
this.multiTableManager = multiTableManager;
this.epochId = epochId == 0 ? 1 : epochId;
+ this.context = context;
initResourceManger();
}
@@ -89,6 +93,7 @@ public class SeaTunnelSparkDataWriter<CommitInfoT, StateT>
implements DataWriter
new SeaTunnelSparkWriterCommitMessage<>(latestCommitInfoT);
cleanCommitInfo();
sinkWriter.close();
+ context.getEventListener().onEvent(new WriterCloseEvent());
try {
if (resourceManager != null) {
resourceManager.close();
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
index b83787cac1..255a9cd339 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
@@ -64,7 +64,7 @@ public class SeaTunnelSparkDataWriterFactory<CommitInfoT,
StateT>
throw new RuntimeException("Failed to create SinkCommitter.", e);
}
return new SeaTunnelSparkDataWriter<>(
- writer, committer, new MultiTableManager(catalogTables), 0);
+ writer, committer, new MultiTableManager(catalogTables), 0,
context);
}
@Override