This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new c892dea5c [improve][factory] Support for creating multi-table sources
(#4169)
c892dea5c is described below
commit c892dea5c849ee01342ffa250ff409897299baf0
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Feb 20 17:24:43 2023 +0800
[improve][factory] Support for creating multi-table sources (#4169)
---
.../seatunnel/api/table/factory/FactoryUtil.java | 42 ++++++++++++++++------
.../api/table/factory/TableSourceFactory.java | 3 ++
2 files changed, 35 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 9874fda13..46596ff63 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.ServiceConfigurationError;
@@ -63,15 +64,19 @@ public final class FactoryUtil {
final TableSourceFactory factory = discoverFactory(classLoader,
TableSourceFactory.class, factoryIdentifier);
List<SeaTunnelSource<T, SplitT, StateT>> sources = new
ArrayList<>(multipleTables.size());
if (factory instanceof SupportMultipleTable) {
- TableFactoryContext context = new
TableFactoryContext(multipleTables, options, classLoader);
- SupportMultipleTable multipleTableSourceFactory =
(SupportMultipleTable) factory;
- // TODO: create all source
- SupportMultipleTable.Result result =
multipleTableSourceFactory.applyTables(context);
- TableSource<T, SplitT, StateT> multipleTableSource =
factory.createSource(
- new TableFactoryContext(result.getAcceptedTables(),
options, classLoader));
- // TODO: handle reading metadata
- SeaTunnelSource<T, SplitT, StateT> source =
multipleTableSource.createSource();
- sources.add(source);
+ List<CatalogTable> remainingTables = multipleTables;
+ while (!remainingTables.isEmpty()) {
+ TableFactoryContext context = new
TableFactoryContext(remainingTables, options, classLoader);
+ SupportMultipleTable.Result result =
((SupportMultipleTable) factory).applyTables(context);
+ List<CatalogTable> acceptedTables =
result.getAcceptedTables();
+ sources.add(createAndPrepareSource(factory,
acceptedTables, options, classLoader));
+ remainingTables = result.getRemainingTables();
+ }
+ } else {
+ for (CatalogTable catalogTable : multipleTables) {
+ List<CatalogTable> acceptedTables =
Collections.singletonList(catalogTable);
+ sources.add(createAndPrepareSource(factory,
acceptedTables, options, classLoader));
+ }
}
return sources;
} catch (Throwable t) {
@@ -82,6 +87,21 @@ public final class FactoryUtil {
}
}
+ public static <T, SplitT extends SourceSplit, StateT extends Serializable>
SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
+ TableSourceFactory factory,
+ List<CatalogTable> acceptedTables,
+ ReadonlyConfig options,
+ ClassLoader classLoader) {
+ TableFactoryContext context = new TableFactoryContext(acceptedTables,
options, classLoader);
+ TableSource<T, SplitT, StateT> tableSource =
factory.createSource(context);
+ validateAndApplyMetadata(acceptedTables, tableSource);
+ return tableSource.createSource();
+ }
+
+ private static void validateAndApplyMetadata(List<CatalogTable>
catalogTables, TableSource<?, ?, ?> tableSource) {
+ // TODO: handle reading metadata
+ }
+
public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createAndPrepareSink(
ClassLoader classLoader, String factoryIdentifier) {
// todo: do we need to set table?
@@ -177,7 +197,9 @@ public final class FactoryUtil {
}
Class<? extends SeaTunnelSource> sourceClass =
factory.getSourceClass();
- if (SupportParallelism.class.isAssignableFrom(sourceClass)) {
+ if (factory instanceof SupportParallelism
+ // TODO: Implement SupportParallelism in the TableSourceFactory
instead of the SeaTunnelSource
+ || SupportParallelism.class.isAssignableFrom(sourceClass)) {
OptionRule sourceCommonOptionRule =
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 001f1feb1..b5e448a6e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -38,5 +38,8 @@ public interface TableSourceFactory extends Factory {
throw new UnsupportedOperationException("unsupported now");
}
+ /**
+ * TODO: Implement SupportParallelism in the TableSourceFactory instead of
the SeaTunnelSource, Then deprecated the method
+ */
Class<? extends SeaTunnelSource> getSourceClass();
}