This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 71500da34 [improve][DAG] SinkAction add table info (#4188)
71500da34 is described below

commit 71500da341e145c6daaa437e7aa6b2d900b485df
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Feb 22 20:57:18 2023 +0800

    [improve][DAG] SinkAction add table info (#4188)
---
 .../engine/core/parse/MultipleTableJobConfigParser.java          | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 7326f911f..e607a95fe 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -40,6 +40,7 @@ import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -238,7 +239,13 @@ public class MultipleTableJobConfigParser {
             }
             SeaTunnelSink<?, ?, ?, ?> sink = 
FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader, 
factoryId);
             long id = idGenerator.getNextId();
-            SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(id, 
factoryId, Collections.singletonList(leftAction), sink, factoryUrls);
+            SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(id,
+                factoryId,
+                Collections.singletonList(leftAction),
+                sink,
+                factoryUrls,
+                new SinkConfig(catalogTable.getTableId().getTableName()));
+
             sinkAction.setParallelism(leftAction.getParallelism());
             sinkActions.add(sinkAction);
         }

Reply via email to