This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 71500da34 [improve][DAG] SinkAction add table info (#4188)
71500da34 is described below
commit 71500da341e145c6daaa437e7aa6b2d900b485df
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Feb 22 20:57:18 2023 +0800
[improve][DAG] SinkAction add table info (#4188)
---
.../engine/core/parse/MultipleTableJobConfigParser.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 7326f911f..e607a95fe 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -40,6 +40,7 @@ import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -238,7 +239,13 @@ public class MultipleTableJobConfigParser {
}
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader,
factoryId);
long id = idGenerator.getNextId();
- SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(id,
factoryId, Collections.singletonList(leftAction), sink, factoryUrls);
+ SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(id,
+ factoryId,
+ Collections.singletonList(leftAction),
+ sink,
+ factoryUrls,
+ new SinkConfig(catalogTable.getTableId().getTableName()));
+
sinkAction.setParallelism(leftAction.getParallelism());
sinkActions.add(sinkAction);
}