This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new a6d27f6fe [improve][zeta] action add factory jar url (#4185)
a6d27f6fe is described below
commit a6d27f6fe13f98c5d111a2dc4da09013364b9c33
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Feb 22 20:44:28 2023 +0800
[improve][zeta] action add factory jar url (#4185)
---
.../api/table/catalog/CatalogTableUtil.java | 9 ++-
.../seatunnel/api/table/factory/FactoryUtil.java | 41 +++++++++++--
.../core/parse/MultipleTableJobConfigParser.java | 71 +++++++++++++++++++---
3 files changed, 101 insertions(+), 20 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 344cda1f0..9a6f21016 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.factory.FactoryException;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -52,6 +51,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.regex.Pattern;
public class CatalogTableUtil implements Serializable {
@@ -86,13 +86,12 @@ public class CatalogTableUtil implements Serializable {
return Collections.singletonList(catalogTable);
}
- Catalog catalog = null;
- try {
- catalog =
FactoryUtil.createCatalog(catalogConfig.get(CatalogOptions.NAME),
catalogConfig, classLoader, factoryId);
- } catch (FactoryException e) {
+ Optional<Catalog> optionalCatalog =
FactoryUtil.createOptionalCatalog(catalogConfig.get(CatalogOptions.NAME),
catalogConfig, classLoader, factoryId);
+ if (!optionalCatalog.isPresent()) {
return Collections.emptyList();
}
+ Catalog catalog = optionalCatalog.get();
// Get the list of specified tables
List<String> tableNames =
catalogConfig.get(CatalogOptions.TABLE_NAMES);
List<CatalogTable> catalogTables = new ArrayList<>();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 17a4c4fd9..2fc9442bf 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -39,10 +39,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
@@ -118,12 +120,34 @@ public final class FactoryUtil {
}
}
- public static Catalog createCatalog(String catalogName,
+ public static Optional<Catalog> createOptionalCatalog(String catalogName,
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier) {
- CatalogFactory catalogFactory = discoverFactory(classLoader,
CatalogFactory.class, factoryIdentifier);
- return catalogFactory.createCatalog(catalogName, options);
+ Optional<CatalogFactory> optionalFactory =
discoverOptionalFactory(classLoader, CatalogFactory.class, factoryIdentifier);
+ return optionalFactory.map(catalogFactory ->
catalogFactory.createCatalog(catalogName, options));
+ }
+
+ public static <T extends Factory> URL getFactoryUrl(T factory) {
+ URL jarUrl =
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
+ return jarUrl;
+ }
+
+ public static <T extends Factory> Optional<T> discoverOptionalFactory(
+ ClassLoader classLoader, Class<T> factoryClass, String
factoryIdentifier) {
+ final List<T> foundFactories = discoverFactories(classLoader,
factoryClass);
+ if (foundFactories.isEmpty()) {
+ return Optional.empty();
+ }
+ final List<T> matchingFactories =
+ foundFactories.stream()
+ .filter(f ->
f.factoryIdentifier().equals(factoryIdentifier))
+ .collect(Collectors.toList());
+ if (matchingFactories.isEmpty()) {
+ return Optional.empty();
+ }
+ checkMultipleMatchingFactories(factoryIdentifier, factoryClass,
matchingFactories);
+ return Optional.of(matchingFactories.get(0));
}
public static <T extends Factory> T discoverFactory(
@@ -157,6 +181,15 @@ public final class FactoryUtil {
.collect(Collectors.joining("\n"))));
}
+ checkMultipleMatchingFactories(factoryIdentifier, factoryClass,
matchingFactories);
+
+ return matchingFactories.get(0);
+ }
+
+ private static <T extends Factory> void checkMultipleMatchingFactories(
+ String factoryIdentifier,
+ Class<T> factoryClass,
+ List<T> matchingFactories) {
if (matchingFactories.size() > 1) {
throw new FactoryException(
String.format(
@@ -170,8 +203,6 @@ public final class FactoryUtil {
.sorted()
.collect(Collectors.joining("\n"))));
}
-
- return matchingFactories.get(0);
}
@SuppressWarnings("unchecked")
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 02bf2211a..7326f911f 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -23,10 +23,16 @@ import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -44,6 +50,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Paths;
@@ -53,6 +60,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -72,8 +80,6 @@ public class MultipleTableJobConfigParser {
private final Map<String, List<Tuple2<CatalogTable, Action>>> graph;
- private final Set<URL> jarUrls;
-
private final JobConfigParser fallbackParser;
public MultipleTableJobConfigParser(String jobDefineFilePath,
@@ -95,7 +101,6 @@ public class MultipleTableJobConfigParser {
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.graph = new HashMap<>();
- this.jarUrls = new HashSet<>();
this.fallbackParser = new JobConfigParser(jobDefineFilePath,
idGenerator, jobConfig, commonPluginJars);
}
@@ -103,7 +108,13 @@ public class MultipleTableJobConfigParser {
if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
return fallbackParser.parse();
}
- ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(new
ArrayList<>());
+ List<URL> connectorJars = null;
+ try {
+ connectorJars =
FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ ClassLoader classLoader = new
SeaTunnelChildFirstClassLoader(connectorJars);
Thread.currentThread().setContextClassLoader(classLoader);
// TODO: Support configuration transform
List<? extends Config> sourceConfigs =
seaTunnelJobConfig.getConfigList("source");
@@ -119,9 +130,25 @@ public class MultipleTableJobConfigParser {
for (Config sinkConfig : sinkConfigs) {
sinkActions.addAll(parserSink(sinkConfig, classLoader));
}
+ Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
+ factoryUrls.addAll(commonPluginJars);
sinkActions.forEach(this::addCommonPluginJarsToAction);
- jarUrls.addAll(commonPluginJars);
- return new ImmutablePair<>(sinkActions, null);
+ return new ImmutablePair<>(sinkActions, factoryUrls);
+ }
+
+ public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
+ Set<URL> urls = new HashSet<>();
+ fillUsedFactoryUrls(sinkActions, urls);
+ return urls;
+ }
+
+ private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {
+ actions.forEach(action -> {
+ result.addAll(action.getJarUrls());
+ if (!action.getUpstream().isEmpty()) {
+ fillUsedFactoryUrls(action.getUpstream(), result);
+ }
+ });
}
void addCommonPluginJarsToAction(Action action) {
@@ -144,13 +171,18 @@ public class MultipleTableJobConfigParser {
public void parserSource(Config sourceConfig, ClassLoader classLoader) {
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sourceConfig);
- String factoryId =
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+ String factoryId = getFactoryId(readonlyConfig);
String tableId =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>>> sources =
FactoryUtil.createAndPrepareSource(catalogTables, readonlyConfig,
classLoader, factoryId);
- // TODO: get factory jar
+
+ // get factory urls
Set<URL> factoryUrls = new HashSet<>();
+ URL factoryUrl =
FactoryUtil.getFactoryUrl(FactoryUtil.discoverFactory(classLoader,
TableSourceFactory.class, factoryId));
+ factoryUrls.add(factoryUrl);
+ getCatalogFactoryUrl(sourceConfig,
classLoader).ifPresent(factoryUrls::add);
+
List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
int parallelism = getParallelism(readonlyConfig);
for (Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2 : sources) {
@@ -164,6 +196,16 @@ public class MultipleTableJobConfigParser {
graph.put(tableId, actions);
}
+ public static Optional<URL> getCatalogFactoryUrl(Config config,
ClassLoader classLoader) {
+ // catalog url
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+ Map<String, String> catalogOptions =
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new
HashMap<>());
+ // TODO: fallback key
+ String factoryId =
catalogOptions.getOrDefault(CommonOptions.FACTORY_ID.key(),
readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+ Optional<CatalogFactory> optionalFactory =
FactoryUtil.discoverOptionalFactory(classLoader, CatalogFactory.class,
factoryId);
+ return optionalFactory.map(FactoryUtil::getFactoryUrl);
+ }
+
private int getParallelism(ReadonlyConfig config) {
return Math.max(1,
config.getOptional(CommonOptions.PARALLELISM)
@@ -175,11 +217,16 @@ public class MultipleTableJobConfigParser {
.stream()
.collect(Collectors.toMap(catalogTable ->
catalogTable.getTableId().toTablePath(), catalogTable -> catalogTable));
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
- String factoryId =
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+ String factoryId = getFactoryId(readonlyConfig);
String leftTableId =
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
List<Tuple2<CatalogTable, Action>> tableTuples =
graph.get(leftTableId);
- // TODO: get factory jar
+
+ // get factory urls
Set<URL> factoryUrls = new HashSet<>();
+ URL factoryUrl =
FactoryUtil.getFactoryUrl(FactoryUtil.discoverFactory(classLoader,
TableSinkFactory.class, factoryId));
+ factoryUrls.add(factoryUrl);
+ getCatalogFactoryUrl(sinkConfig,
classLoader).ifPresent(factoryUrls::add);
+
List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
CatalogTable catalogTable = tableTuple._1();
@@ -197,4 +244,8 @@ public class MultipleTableJobConfigParser {
}
return sinkActions;
}
+
+ private static String getFactoryId(ReadonlyConfig readonlyConfig) {
+ return
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+ }
}