This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new a6d27f6fe [improve][zeta] action add factory jar url (#4185)
a6d27f6fe is described below

commit a6d27f6fe13f98c5d111a2dc4da09013364b9c33
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Feb 22 20:44:28 2023 +0800

    [improve][zeta] action add factory jar url (#4185)
---
 .../api/table/catalog/CatalogTableUtil.java        |  9 ++-
 .../seatunnel/api/table/factory/FactoryUtil.java   | 41 +++++++++++--
 .../core/parse/MultipleTableJobConfigParser.java   | 71 +++++++++++++++++++---
 3 files changed, 101 insertions(+), 20 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 344cda1f0..9a6f21016 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.factory.FactoryException;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -52,6 +51,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.regex.Pattern;
 
 public class CatalogTableUtil implements Serializable {
@@ -86,13 +86,12 @@ public class CatalogTableUtil implements Serializable {
             return Collections.singletonList(catalogTable);
         }
 
-        Catalog catalog = null;
-        try {
-            catalog = 
FactoryUtil.createCatalog(catalogConfig.get(CatalogOptions.NAME), 
catalogConfig, classLoader, factoryId);
-        } catch (FactoryException e) {
+        Optional<Catalog> optionalCatalog = 
FactoryUtil.createOptionalCatalog(catalogConfig.get(CatalogOptions.NAME), 
catalogConfig, classLoader, factoryId);
+        if (!optionalCatalog.isPresent()) {
             return Collections.emptyList();
         }
 
+        Catalog catalog = optionalCatalog.get();
         // Get the list of specified tables
         List<String> tableNames = 
catalogConfig.get(CatalogOptions.TABLE_NAMES);
         List<CatalogTable> catalogTables = new ArrayList<>();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 17a4c4fd9..2fc9442bf 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -39,10 +39,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
@@ -118,12 +120,34 @@ public final class FactoryUtil {
         }
     }
 
-    public static Catalog createCatalog(String catalogName,
+    public static Optional<Catalog> createOptionalCatalog(String catalogName,
                                         ReadonlyConfig options,
                                         ClassLoader classLoader,
                                         String factoryIdentifier) {
-        CatalogFactory catalogFactory = discoverFactory(classLoader, 
CatalogFactory.class, factoryIdentifier);
-        return catalogFactory.createCatalog(catalogName, options);
+        Optional<CatalogFactory> optionalFactory = 
discoverOptionalFactory(classLoader, CatalogFactory.class, factoryIdentifier);
+        return optionalFactory.map(catalogFactory -> 
catalogFactory.createCatalog(catalogName, options));
+    }
+
+    public static <T extends Factory> URL getFactoryUrl(T factory) {
+        URL jarUrl = 
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
+        return jarUrl;
+    }
+
+    public static <T extends Factory> Optional<T> discoverOptionalFactory(
+        ClassLoader classLoader, Class<T> factoryClass, String 
factoryIdentifier) {
+        final List<T> foundFactories = discoverFactories(classLoader, 
factoryClass);
+        if (foundFactories.isEmpty()) {
+            return Optional.empty();
+        }
+        final List<T> matchingFactories =
+                foundFactories.stream()
+                        .filter(f -> 
f.factoryIdentifier().equals(factoryIdentifier))
+                        .collect(Collectors.toList());
+        if (matchingFactories.isEmpty()) {
+            return Optional.empty();
+        }
+        checkMultipleMatchingFactories(factoryIdentifier, factoryClass, 
matchingFactories);
+        return Optional.of(matchingFactories.get(0));
     }
 
     public static <T extends Factory> T discoverFactory(
@@ -157,6 +181,15 @@ public final class FactoryUtil {
                                     .collect(Collectors.joining("\n"))));
         }
 
+        checkMultipleMatchingFactories(factoryIdentifier, factoryClass, 
matchingFactories);
+
+        return matchingFactories.get(0);
+    }
+
+    private static <T extends Factory> void checkMultipleMatchingFactories(
+            String factoryIdentifier,
+            Class<T> factoryClass,
+            List<T> matchingFactories) {
         if (matchingFactories.size() > 1) {
             throw new FactoryException(
                     String.format(
@@ -170,8 +203,6 @@ public final class FactoryUtil {
                                     .sorted()
                                     .collect(Collectors.joining("\n"))));
         }
-
-        return matchingFactories.get(0);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 02bf2211a..7326f911f 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -23,10 +23,16 @@ import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -44,6 +50,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
 import java.nio.file.Paths;
@@ -53,6 +60,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -72,8 +80,6 @@ public class MultipleTableJobConfigParser {
 
     private final Map<String, List<Tuple2<CatalogTable, Action>>> graph;
 
-    private final Set<URL> jarUrls;
-
     private final JobConfigParser fallbackParser;
 
     public MultipleTableJobConfigParser(String jobDefineFilePath,
@@ -95,7 +101,6 @@ public class MultipleTableJobConfigParser {
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.graph = new HashMap<>();
-        this.jarUrls = new HashSet<>();
         this.fallbackParser = new JobConfigParser(jobDefineFilePath, 
idGenerator, jobConfig, commonPluginJars);
     }
 
@@ -103,7 +108,13 @@ public class MultipleTableJobConfigParser {
         if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
             return fallbackParser.parse();
         }
-        ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(new 
ArrayList<>());
+        List<URL> connectorJars = null;
+        try {
+            connectorJars = 
FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        ClassLoader classLoader = new 
SeaTunnelChildFirstClassLoader(connectorJars);
         Thread.currentThread().setContextClassLoader(classLoader);
         // TODO: Support configuration transform
         List<? extends Config> sourceConfigs = 
seaTunnelJobConfig.getConfigList("source");
@@ -119,9 +130,25 @@ public class MultipleTableJobConfigParser {
         for (Config sinkConfig : sinkConfigs) {
             sinkActions.addAll(parserSink(sinkConfig, classLoader));
         }
+        Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
+        factoryUrls.addAll(commonPluginJars);
         sinkActions.forEach(this::addCommonPluginJarsToAction);
-        jarUrls.addAll(commonPluginJars);
-        return new ImmutablePair<>(sinkActions, null);
+        return new ImmutablePair<>(sinkActions, factoryUrls);
+    }
+
+    public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
+        Set<URL> urls = new HashSet<>();
+        fillUsedFactoryUrls(sinkActions, urls);
+        return urls;
+    }
+
+    private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {
+        actions.forEach(action -> {
+            result.addAll(action.getJarUrls());
+            if (!action.getUpstream().isEmpty()) {
+                fillUsedFactoryUrls(action.getUpstream(), result);
+            }
+        });
     }
 
     void addCommonPluginJarsToAction(Action action) {
@@ -144,13 +171,18 @@ public class MultipleTableJobConfigParser {
     public void parserSource(Config sourceConfig, ClassLoader classLoader) {
         List<CatalogTable> catalogTables = 
CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
         ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(sourceConfig);
-        String factoryId = 
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+        String factoryId = getFactoryId(readonlyConfig);
         String tableId = 
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
 
         List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>>> sources =
             FactoryUtil.createAndPrepareSource(catalogTables, readonlyConfig, 
classLoader, factoryId);
-        // TODO: get factory jar
+
+        // get factory urls
         Set<URL> factoryUrls = new HashSet<>();
+        URL factoryUrl = 
FactoryUtil.getFactoryUrl(FactoryUtil.discoverFactory(classLoader, 
TableSourceFactory.class, factoryId));
+        factoryUrls.add(factoryUrl);
+        getCatalogFactoryUrl(sourceConfig, 
classLoader).ifPresent(factoryUrls::add);
+
         List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
         int parallelism = getParallelism(readonlyConfig);
         for (Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2 : sources) {
@@ -164,6 +196,16 @@ public class MultipleTableJobConfigParser {
         graph.put(tableId, actions);
     }
 
+    public static Optional<URL> getCatalogFactoryUrl(Config config, 
ClassLoader classLoader) {
+        // catalog url
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+        Map<String, String> catalogOptions = 
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new 
HashMap<>());
+        // TODO: fallback key
+        String factoryId = 
catalogOptions.getOrDefault(CommonOptions.FACTORY_ID.key(), 
readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+        Optional<CatalogFactory> optionalFactory = 
FactoryUtil.discoverOptionalFactory(classLoader, CatalogFactory.class, 
factoryId);
+        return optionalFactory.map(FactoryUtil::getFactoryUrl);
+    }
+
     private int getParallelism(ReadonlyConfig config) {
         return Math.max(1,
             config.getOptional(CommonOptions.PARALLELISM)
@@ -175,11 +217,16 @@ public class MultipleTableJobConfigParser {
             .stream()
             .collect(Collectors.toMap(catalogTable -> 
catalogTable.getTableId().toTablePath(), catalogTable -> catalogTable));
         ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
-        String factoryId = 
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+        String factoryId = getFactoryId(readonlyConfig);
         String leftTableId = 
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
         List<Tuple2<CatalogTable, Action>> tableTuples = 
graph.get(leftTableId);
-        // TODO: get factory jar
+
+        // get factory urls
         Set<URL> factoryUrls = new HashSet<>();
+        URL factoryUrl = 
FactoryUtil.getFactoryUrl(FactoryUtil.discoverFactory(classLoader, 
TableSinkFactory.class, factoryId));
+        factoryUrls.add(factoryUrl);
+        getCatalogFactoryUrl(sinkConfig, 
classLoader).ifPresent(factoryUrls::add);
+
         List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
         for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
             CatalogTable catalogTable = tableTuple._1();
@@ -197,4 +244,8 @@ public class MultipleTableJobConfigParser {
         }
         return sinkActions;
     }
+
+    private static String getFactoryId(ReadonlyConfig readonlyConfig) {
+        return 
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+    }
 }

Reply via email to