This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 30af654ae [Feature][Core] Supprot add thirdparty jar in env (#3576)
30af654ae is described below

commit 30af654aef3ab6bc56bbbce785258e869db9837a
Author: liugddx <[email protected]>
AuthorDate: Wed Nov 30 17:09:08 2022 +0800

    [Feature][Core] Supprot add thirdparty jar in env (#3576)
    
    * supprot add thirdparty jar in env
---
 docs/en/connector-v2/EnvConf.md                    | 23 ++++++++++++++
 docs/sidebars.js                                   |  3 +-
 .../apache/seatunnel/api/env/EnvCommonOptions.java |  6 ++++
 .../apache/seatunnel/api/env/EnvOptionRule.java    |  1 +
 .../org/apache/seatunnel/common/config/Common.java | 14 +++++++++
 .../starter/flink/execution/FlinkExecution.java    | 12 ++++++--
 .../seatunnel/core/starter/spark/SparkStarter.java | 36 ++++++++++++----------
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  |  2 +-
 .../resources/jdbc_gbase8a_source_to_assert.conf   |  1 +
 .../test/resources/jdbc_oracle_source_to_sink.conf |  1 +
 .../resources/jdbc_starrocks_source_to_sink.conf   |  1 +
 .../engine/client/job/JobExecutionEnvironment.java | 14 +++++++++
 12 files changed, 92 insertions(+), 22 deletions(-)

diff --git a/docs/en/connector-v2/EnvConf.md b/docs/en/connector-v2/EnvConf.md
new file mode 100644
index 000000000..d0616d039
--- /dev/null
+++ b/docs/en/connector-v2/EnvConf.md
@@ -0,0 +1,23 @@
+# EnvConf
+
+This document describes env configuration information,env unifies the 
environment variables of all engines.
+
+## job.name
+
+This parameter configures the task name.
+
+## jars
+
+Third-party packages can be loaded via `jars`, like 
`jars="file://local/jar1.jar;file://local/jar2.jar"`
+
+## job.mode
+
+You can configure whether the task is in batch mode or stream mode through 
`job.mode`, like `job.mode = "BATCH"` or `job.mode = "STREAMING"` 
+
+## checkpoint.interval
+
+Gets the interval in which checkpoints are periodically scheduled.
+
+## parallelism
+
+This parameter configures the parallelism of source and sink.
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 470290d42..9951a4650 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -121,7 +121,8 @@ const sidebars = {
                         }
                     ]
                 },
-                "connector-v2/Error-Quick-Reference-Manual"
+                "connector-v2/Error-Quick-Reference-Manual",
+                "connector-v2/EnvConf"
             ]
         },
         {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 52a716add..c1a8db383 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -49,6 +49,12 @@ public class EnvCommonOptions {
             .noDefaultValue()
             .withDescription("The interval (in milliseconds) between two 
consecutive checkpoints.");
 
+    public static final Option<String> JARS =
+        Options.key("jars")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("third-party packages can be loaded via `jars`");
+
     public static final Option<Map<String, String>> CUSTOM_PARAMETERS =
         Options.key("custom_parameters")
             .mapType()
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index cbfd3d455..bc40247b3 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -26,6 +26,7 @@ public class EnvOptionRule {
             .required(EnvCommonOptions.JOB_MODE)
             .optional(EnvCommonOptions.JOB_NAME,
                 EnvCommonOptions.PARALLELISM,
+                EnvCommonOptions.JARS,
                 EnvCommonOptions.CHECKPOINT_INTERVAL,
                 EnvCommonOptions.CUSTOM_PARAMETERS)
             .build();
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 871773732..02a145471 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -23,12 +23,15 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -160,6 +163,17 @@ public class Common {
         }
     }
 
+    /**
+     * return the jar package configured in env jars
+     */
+    public static Set<Path> getThirdPartyJars(String paths) {
+
+        return Arrays.stream(paths.split(";"))
+            .filter(s -> !"".equals(s))
+            .filter(it -> it.endsWith(".jar"))
+            .map(path -> 
Paths.get(URI.create(path))).collect(Collectors.toSet());
+    }
+
     public static Path pluginTarball() {
         return appRootDir().resolve("plugins.tar.gz");
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index a673199f9..82991a056 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -69,7 +70,7 @@ public class FlinkExecution implements TaskExecution {
         } catch (MalformedURLException e) {
             throw new SeaTunnelException("load flink starter error.", e);
         }
-        registerPlugin();
+        registerPlugin(config.getConfig("env"));
         JobContext jobContext = new JobContext();
         jobContext.setJobMode(FlinkEnvironmentFactory.getJobMode(config));
 
@@ -100,8 +101,13 @@ public class FlinkExecution implements TaskExecution {
         }
     }
 
-    private void registerPlugin() {
-        List<URL> jarDependencies = 
Stream.concat(Common.getPluginsJarDependencies().stream(), 
Common.getLibJars().stream())
+    private void registerPlugin(Config envConfig) {
+        List<Path> thirdPartyJars = new ArrayList<>();
+        if (envConfig.hasPath(EnvCommonOptions.JARS.key())) {
+            thirdPartyJars = new 
ArrayList<>(Common.getThirdPartyJars(envConfig.getString(EnvCommonOptions.JARS.key())));
+        }
+        thirdPartyJars.addAll(Common.getPluginsJarDependencies());
+        List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(), 
Common.getLibJars().stream())
             .map(Path::toUri)
             .map(uri -> {
                 try {
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 681de41d7..2f645fa1f 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.Starter;
@@ -124,6 +125,7 @@ public class SparkStarter implements Starter {
         this.jars.addAll(Common.getPluginsJarDependencies());
         this.jars.addAll(Common.getLibJars());
         this.jars.addAll(getConnectorJarDependencies());
+        this.jars.addAll(new 
ArrayList<>(Common.getThirdPartyJars(sparkConf.getOrDefault(EnvCommonOptions.JARS.key(),
 ""))));
         return buildFinal();
     }
 
@@ -132,19 +134,19 @@ public class SparkStarter implements Starter {
      */
     private void setSparkConf() throws FileNotFoundException {
         commandArgs.getVariables()
-                .stream()
-                .filter(Objects::nonNull)
-                .map(variable -> variable.split("=", 2))
-                .filter(pair -> pair.length == 2)
-                .forEach(pair -> System.setProperty(pair[0], pair[1]));
+            .stream()
+            .filter(Objects::nonNull)
+            .map(variable -> variable.split("=", 2))
+            .filter(pair -> pair.length == 2)
+            .forEach(pair -> System.setProperty(pair[0], pair[1]));
         this.sparkConf = getSparkConf(commandArgs.getConfigFile());
         String driverJavaOpts = 
this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
         String executorJavaOpts = 
this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", "");
         if (!commandArgs.getVariables().isEmpty()) {
             String properties = commandArgs.getVariables()
-                    .stream()
-                    .map(v -> "-D" + v)
-                    .collect(Collectors.joining(" "));
+                .stream()
+                .map(v -> "-D" + v)
+                .collect(Collectors.joining(" "));
             driverJavaOpts += " " + properties;
             executorJavaOpts += " " + properties;
             this.sparkConf.put("spark.driver.extraJavaOptions", 
driverJavaOpts.trim());
@@ -161,13 +163,13 @@ public class SparkStarter implements Starter {
             throw new FileNotFoundException("config file '" + file + "' does 
not exists!");
         }
         Config appConfig = ConfigFactory.parseFile(file)
-                
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                .resolveWith(ConfigFactory.systemProperties(), 
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(ConfigFactory.systemProperties(), 
ConfigResolveOptions.defaults().setAllowUnresolved(true));
 
         return appConfig.getConfig("env")
-                .entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().unwrapped().toString()));
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().unwrapped().toString()));
     }
 
     /**
@@ -235,8 +237,8 @@ public class SparkStarter implements Starter {
     protected void appendPaths(List<String> commands, String option, 
List<Path> paths) {
         if (!paths.isEmpty()) {
             String values = paths.stream()
-                    .map(Path::toString)
-                    .collect(Collectors.joining(","));
+                .map(Path::toString)
+                .collect(Collectors.joining(","));
             appendOption(commands, option, values);
         }
     }
@@ -264,8 +266,8 @@ public class SparkStarter implements Starter {
         return Arrays.stream(pluginTypes).flatMap((Function<PluginType, 
Stream<PluginIdentifier>>) pluginType -> {
             List<? extends Config> configList = 
config.getConfigList(pluginType.getType());
             return configList.stream()
-                    .map(pluginConfig -> PluginIdentifier.of("seatunnel", 
pluginType.getType(),
-                            pluginConfig.getString("plugin_name")));
+                .map(pluginConfig -> PluginIdentifier.of("seatunnel", 
pluginType.getType(),
+                    pluginConfig.getString("plugin_name")));
         }).collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 99ec3b58e..602428e76 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -73,7 +73,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
 
     @TestContainerExtension
     private final ContainerExtendedFactory extendedFactory = container -> {
-        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + jdbcCase.getDriverJar());
+        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/jars && cd /tmp/jars && curl -O " + 
jdbcCase.getDriverJar());
         Assertions.assertEquals(0, extraCommands.getExitCode());
     };
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
index aff8f0096..4e65793f2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
@@ -22,6 +22,7 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
+  jars = 
"file:///tmp/jars/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar"
   #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
index c67a4d2fe..1ba38aa04 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
@@ -22,6 +22,7 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
+  jars = "file:///tmp/jars/ojdbc8-12.2.0.1.jar"
   #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
index 54886d059..02ed19601 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
@@ -18,6 +18,7 @@
 env {
   execution.parallelism = 1
   job.mode = "BATCH"
+  jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
 }
 
 source {
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 985b95cc8..94c80cabd 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.client.job;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
@@ -35,14 +36,17 @@ import com.hazelcast.logging.Logger;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 public class JobExecutionEnvironment {
 
@@ -76,6 +80,16 @@ public class JobExecutionEnvironment {
         this.jobClient = new JobClient(seaTunnelHazelcastClient);
         this.jobConfig.setJobContext(new JobContext(jobClient.getNewJobId()));
         this.commonPluginJars.addAll(searchPluginJars());
+        this.commonPluginJars.addAll(new 
ArrayList<>(Common.getThirdPartyJars(jobConfig.getEnvOptions()
+                .getOrDefault(EnvCommonOptions.JARS.key(), 
"").toString()).stream().map(Path::toUri)
+            .map(uri -> {
+                try {
+                    return uri.toURL();
+                } catch (MalformedURLException e) {
+                    throw new SeaTunnelEngineException("the uri of jar 
illegal:" + uri, e);
+                }
+            })
+            .collect(Collectors.toList())));
         LOGGER.info("add common jar in plugins :" + commonPluginJars);
     }
 

Reply via email to