This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new c892dea5c [improve][factory] Support for creating multi-table sources 
(#4169)
c892dea5c is described below

commit c892dea5c849ee01342ffa250ff409897299baf0
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Feb 20 17:24:43 2023 +0800

    [improve][factory] Support for creating multi-table sources (#4169)
---
 .../seatunnel/api/table/factory/FactoryUtil.java   | 42 ++++++++++++++++------
 .../api/table/factory/TableSourceFactory.java      |  3 ++
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 9874fda13..46596ff63 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ServiceConfigurationError;
@@ -63,15 +64,19 @@ public final class FactoryUtil {
             final TableSourceFactory factory = discoverFactory(classLoader, 
TableSourceFactory.class, factoryIdentifier);
             List<SeaTunnelSource<T, SplitT, StateT>> sources = new 
ArrayList<>(multipleTables.size());
             if (factory instanceof SupportMultipleTable) {
-                TableFactoryContext context = new 
TableFactoryContext(multipleTables, options, classLoader);
-                SupportMultipleTable multipleTableSourceFactory = 
(SupportMultipleTable) factory;
-                // TODO: create all source
-                SupportMultipleTable.Result result = 
multipleTableSourceFactory.applyTables(context);
-                TableSource<T, SplitT, StateT> multipleTableSource = 
factory.createSource(
-                    new TableFactoryContext(result.getAcceptedTables(), 
options, classLoader));
-                // TODO: handle reading metadata
-                SeaTunnelSource<T, SplitT, StateT> source = 
multipleTableSource.createSource();
-                sources.add(source);
+                List<CatalogTable> remainingTables = multipleTables;
+                while (!remainingTables.isEmpty()) {
+                    TableFactoryContext context = new 
TableFactoryContext(remainingTables, options, classLoader);
+                    SupportMultipleTable.Result result = 
((SupportMultipleTable) factory).applyTables(context);
+                    List<CatalogTable> acceptedTables = 
result.getAcceptedTables();
+                    sources.add(createAndPrepareSource(factory, 
acceptedTables, options, classLoader));
+                    remainingTables = result.getRemainingTables();
+                }
+            } else {
+                for (CatalogTable catalogTable : multipleTables) {
+                    List<CatalogTable> acceptedTables = 
Collections.singletonList(catalogTable);
+                    sources.add(createAndPrepareSource(factory, 
acceptedTables, options, classLoader));
+                }
             }
             return sources;
         } catch (Throwable t) {
@@ -82,6 +87,21 @@ public final class FactoryUtil {
         }
     }
 
+    public static <T, SplitT extends SourceSplit, StateT extends Serializable> 
SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
+        TableSourceFactory factory,
+        List<CatalogTable> acceptedTables,
+        ReadonlyConfig options,
+        ClassLoader classLoader) {
+        TableFactoryContext context = new TableFactoryContext(acceptedTables, 
options, classLoader);
+        TableSource<T, SplitT, StateT> tableSource = 
factory.createSource(context);
+        validateAndApplyMetadata(acceptedTables, tableSource);
+        return tableSource.createSource();
+    }
+
+    private static void validateAndApplyMetadata(List<CatalogTable> 
catalogTables, TableSource<?, ?, ?> tableSource) {
+        // TODO: handle reading metadata
+    }
+
     public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createAndPrepareSink(
         ClassLoader classLoader, String factoryIdentifier) {
         // todo: do we need to set table?
@@ -177,7 +197,9 @@ public final class FactoryUtil {
         }
 
         Class<? extends SeaTunnelSource> sourceClass = 
factory.getSourceClass();
-        if (SupportParallelism.class.isAssignableFrom(sourceClass)) {
+        if (factory instanceof SupportParallelism
+            // TODO: Implement SupportParallelism in the TableSourceFactory 
instead of the SeaTunnelSource
+            || SupportParallelism.class.isAssignableFrom(sourceClass)) {
             OptionRule sourceCommonOptionRule =
                 
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
             
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 001f1feb1..b5e448a6e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -38,5 +38,8 @@ public interface TableSourceFactory extends Factory {
         throw new UnsupportedOperationException("unsupported now");
     }
 
+    /**
+     * TODO: Implement SupportParallelism in the TableSourceFactory instead of 
the SeaTunnelSource, Then deprecated the method
+     */
     Class<? extends SeaTunnelSource> getSourceClass();
 }

Reply via email to