This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1610856fbd [Fix][Zeta] Fix env jars not working on zeta (#7035)
1610856fbd is described below
commit 1610856fbd50a4f3b172a48e1af9fc4e393a3dad
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jul 9 15:55:52 2024 +0800
[Fix][Zeta] Fix env jars not working on zeta (#7035)
---
.../client/job/ClientJobExecutionEnvironment.java | 5 +-
.../engine/client/SeaTunnelClientTest.java | 24 ++++++++
.../src/test/resources/client_test_with_jars.conf | 71 ++++++++++++++++++++++
.../engine/core/job/AbstractJobEnvironment.java | 24 +-------
.../core/parse/MultipleTableJobConfigParser.java | 28 ++++++++-
.../server/rest/RestJobExecutionEnvironment.java | 4 +-
6 files changed, 129 insertions(+), 27 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index 6e33354351..c895794e4c 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -30,6 +30,8 @@ import
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import com.google.common.annotations.VisibleForTesting;
+
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -106,8 +108,9 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
isStartWithSavePoint);
}
+ @VisibleForTesting
@Override
- protected LogicalDag getLogicalDag() {
+ public LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair =
getJobConfigParser().parse(null);
actions.addAll(immutablePair.getLeft());
// Enable upload connector jar package to engine server, automatically
upload connector Jar
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f16f61a7f0..25eaf89a6c 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
@@ -47,10 +48,13 @@ import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;
+import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
@@ -399,6 +403,26 @@ public class SeaTunnelClientTest {
}
}
+ @Test
+ public void testJarsInEnvAddedToCommonJars() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/client_test_with_jars.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("client_test_with_jars");
+ try (SeaTunnelClient seaTunnelClient = createSeaTunnelClient()) {
+ LogicalDag logicalDag =
+ seaTunnelClient
+ .createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG)
+ .getLogicalDag();
+ Assertions.assertIterableEquals(
+ Arrays.asList("file:/tmp/test.jar", "file:/tmp/test2.jar"),
+
logicalDag.getLogicalVertexMap().values().iterator().next().getAction()
+ .getJarUrls().stream()
+ .map(URL::toString)
+ .collect(Collectors.toList()));
+ }
+ }
+
@Test
public void testSavePointAndRestoreWithSavePoint() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf
new file mode 100644
index 0000000000..77d6d5db85
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ jars = "file:///tmp/test.jar;file:///tmp/test2.jar"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+
+ FakeSource {
+ result_table_name = "fake2"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ LocalFile {
+ path="/tmp/hive/warehouse/test2"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format_type="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error",
+ source_table_name="fake,fake2"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index 49c9b9275d..28c6b4f012 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.engine.core.job;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -36,7 +35,6 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -68,26 +66,6 @@ public abstract class AbstractJobEnvironment {
this.isStartWithSavePoint = isStartWithSavePoint;
this.idGenerator = new IdGenerator();
this.commonPluginJars.addAll(searchPluginJars());
- this.commonPluginJars.addAll(
- new ArrayList<>(
- Common.getThirdPartyJars(
- jobConfig
- .getEnvOptions()
-
.getOrDefault(EnvCommonOptions.JARS.key(), "")
- .toString())
- .stream()
- .map(Path::toUri)
- .map(
- uri -> {
- try {
- return uri.toURL();
- } catch (MalformedURLException e) {
- throw new
SeaTunnelEngineException(
- "the uri of jar
illegal:" + uri, e);
- }
- })
- .collect(Collectors.toList())));
- LOGGER.info("add common jar in plugins :" + commonPluginJars);
}
protected Set<URL> searchPluginJars() {
@@ -149,5 +127,5 @@ public abstract class AbstractJobEnvironment {
return new LogicalDagGenerator(actions, jobConfig, idGenerator,
isStartWithSavePoint);
}
- protected abstract LogicalDag getLogicalDag();
+ public abstract LogicalDag getLogicalDag();
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 3f4ce77c65..40a6640c35 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -38,6 +38,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -46,6 +47,7 @@ import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
@@ -69,7 +71,9 @@ import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
import java.io.Serializable;
+import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
@@ -167,6 +171,7 @@ public class MultipleTableJobConfigParser {
}
public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService
classLoaderService) {
+ this.fillJobConfigAndCommonJars();
List<? extends Config> sourceConfigs =
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "source", Collections.emptyList());
@@ -194,7 +199,6 @@ public class MultipleTableJobConfigParser {
try {
Thread.currentThread().setContextClassLoader(classLoader);
ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs,
sinkConfigs);
- this.fillJobConfig();
LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap =
new LinkedHashMap<>();
@@ -269,7 +273,7 @@ public class MultipleTableJobConfigParser {
});
}
- private void fillJobConfig() {
+ private void fillJobConfigAndCommonJars() {
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
if (StringUtils.isEmpty(jobConfig.getName())
|| jobConfig.getName().equals(Constants.LOGO)
@@ -277,6 +281,26 @@ public class MultipleTableJobConfigParser {
jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
}
jobConfig.getEnvOptions().putAll(envOptions.getSourceMap());
+ this.commonPluginJars.addAll(
+ new ArrayList<>(
+ Common.getThirdPartyJars(
+ jobConfig
+ .getEnvOptions()
+
.getOrDefault(EnvCommonOptions.JARS.key(), "")
+ .toString())
+ .stream()
+ .map(Path::toUri)
+ .map(
+ uri -> {
+ try {
+ return uri.toURL();
+ } catch (MalformedURLException e) {
+ throw new
SeaTunnelEngineException(
+ "the uri of jar
illegal:" + uri, e);
+ }
+ })
+ .collect(Collectors.toList())));
+ log.info("add common jar in plugins :{}", commonPluginJars);
}
private static <T extends Factory> boolean isFallback(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
index d13f1a49d8..e7a1c5108a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -77,8 +78,9 @@ public class RestJobExecutionEnvironment extends
AbstractJobEnvironment {
return jobId;
}
+ @VisibleForTesting
@Override
- protected LogicalDag getLogicalDag() {
+ public LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair =
getJobConfigParser().parse(seaTunnelServer.getClassLoaderService());
actions.addAll(immutablePair.getLeft());