This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 1636afad0 [improve][api] support for create sink (#4171)
1636afad0 is described below

commit 1636afad078d012342ad5ed729356cf27d58ebd2
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Feb 20 18:15:57 2023 +0800

    [improve][api] support for create sink (#4171)
---
 .../apache/seatunnel/api/table/connector/TableSource.java |  3 +++
 .../apache/seatunnel/api/table/factory/FactoryUtil.java   | 15 +++++++++++----
 .../seatunnel/api/table/factory/TableFactoryContext.java  |  7 +++++++
 3 files changed, 21 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index d9b6294c4..acf7b4c23 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.api.source.SourceSplit;
 
 import java.io.Serializable;
 
+/**
+ * Used to support authentication and processing of {@link 
SupportReadingMetadata}
+ */
 public interface TableSource<T, SplitT extends SourceSplit, StateT extends 
Serializable> {
 
     SeaTunnelSource<T, SplitT, StateT> createSource();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 46596ff63..5da8f3c48 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -103,10 +103,17 @@ public final class FactoryUtil {
     }
 
     public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createAndPrepareSink(
-        ClassLoader classLoader, String factoryIdentifier) {
-        // todo: do we need to set table?
-        TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
factory = discoverFactory(classLoader, TableSinkFactory.class, 
factoryIdentifier);
-        return factory.createSink(null).createSink();
+        CatalogTable catalogTable,
+        ClassLoader classLoader,
+        ReadonlyConfig options,
+        String factoryIdentifier) {
+        try {
+            TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
factory = discoverFactory(classLoader, TableSinkFactory.class, 
factoryIdentifier);
+            TableFactoryContext context = new 
TableFactoryContext(Collections.singletonList(catalogTable), options, 
classLoader);
+            return factory.createSink(context).createSink();
+        } catch (Throwable t) {
+            throw new FactoryException(String.format("Unable to create a sink 
for identifier '%s'.", factoryIdentifier), t);
+        }
     }
 
     public static Catalog createCatalog(String catalogName,
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index 97db8a0e6..77d529e84 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -54,4 +54,11 @@ public class TableFactoryContext {
     public List<CatalogTable> getCatalogTables() {
         return catalogTables;
     }
+
+    /**
+     * @return single table.
+     */
+    public CatalogTable getCatalogTable() {
+        return catalogTables.get(0);
+    }
 }

Reply via email to