This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1610856fbd [Fix][Zeta] Fix env jars not working on zeta (#7035)
1610856fbd is described below

commit 1610856fbd50a4f3b172a48e1af9fc4e393a3dad
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jul 9 15:55:52 2024 +0800

    [Fix][Zeta] Fix env jars not working on zeta (#7035)
---
 .../client/job/ClientJobExecutionEnvironment.java  |  5 +-
 .../engine/client/SeaTunnelClientTest.java         | 24 ++++++++
 .../src/test/resources/client_test_with_jars.conf  | 71 ++++++++++++++++++++++
 .../engine/core/job/AbstractJobEnvironment.java    | 24 +-------
 .../core/parse/MultipleTableJobConfigParser.java   | 28 ++++++++-
 .../server/rest/RestJobExecutionEnvironment.java   |  4 +-
 6 files changed, 129 insertions(+), 27 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index 6e33354351..c895794e4c 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -30,6 +30,8 @@ import 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -106,8 +108,9 @@ public class ClientJobExecutionEnvironment extends 
AbstractJobEnvironment {
                 isStartWithSavePoint);
     }
 
+    @VisibleForTesting
     @Override
-    protected LogicalDag getLogicalDag() {
+    public LogicalDag getLogicalDag() {
         ImmutablePair<List<Action>, Set<URL>> immutablePair = 
getJobConfigParser().parse(null);
         actions.addAll(immutablePair.getLeft());
         // Enable upload connector jar package to engine server, automatically 
upload connector Jar
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index f16f61a7f0..25eaf89a6c 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
@@ -47,10 +48,13 @@ import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import lombok.extern.slf4j.Slf4j;
 
+import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
@@ -399,6 +403,26 @@ public class SeaTunnelClientTest {
         }
     }
 
+    @Test
+    public void testJarsInEnvAddedToCommonJars() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = TestUtils.getResource("/client_test_with_jars.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("client_test_with_jars");
+        try (SeaTunnelClient seaTunnelClient = createSeaTunnelClient()) {
+            LogicalDag logicalDag =
+                    seaTunnelClient
+                            .createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG)
+                            .getLogicalDag();
+            Assertions.assertIterableEquals(
+                    Arrays.asList("file:/tmp/test.jar", "file:/tmp/test2.jar"),
+                    
logicalDag.getLogicalVertexMap().values().iterator().next().getAction()
+                            .getJarUrls().stream()
+                            .map(URL::toString)
+                            .collect(Collectors.toList()));
+        }
+    }
+
     @Test
     public void testSavePointAndRestoreWithSavePoint() throws Exception {
         Common.setDeployMode(DeployMode.CLIENT);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf
new file mode 100644
index 0000000000..77d6d5db85
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test_with_jars.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  jars = "file:///tmp/test.jar;file:///tmp/test2.jar"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+    parallelism = 1
+  }
+
+  FakeSource {
+    result_table_name = "fake2"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+    parallelism = 1
+  }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path="/tmp/hive/warehouse/test2"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format_type="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error",
+    source_table_name="fake,fake2"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index 49c9b9275d..28c6b4f012 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.engine.core.job;
 
-import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -36,7 +35,6 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -68,26 +66,6 @@ public abstract class AbstractJobEnvironment {
         this.isStartWithSavePoint = isStartWithSavePoint;
         this.idGenerator = new IdGenerator();
         this.commonPluginJars.addAll(searchPluginJars());
-        this.commonPluginJars.addAll(
-                new ArrayList<>(
-                        Common.getThirdPartyJars(
-                                        jobConfig
-                                                .getEnvOptions()
-                                                
.getOrDefault(EnvCommonOptions.JARS.key(), "")
-                                                .toString())
-                                .stream()
-                                .map(Path::toUri)
-                                .map(
-                                        uri -> {
-                                            try {
-                                                return uri.toURL();
-                                            } catch (MalformedURLException e) {
-                                                throw new 
SeaTunnelEngineException(
-                                                        "the uri of jar 
illegal:" + uri, e);
-                                            }
-                                        })
-                                .collect(Collectors.toList())));
-        LOGGER.info("add common jar in plugins :" + commonPluginJars);
     }
 
     protected Set<URL> searchPluginJars() {
@@ -149,5 +127,5 @@ public abstract class AbstractJobEnvironment {
         return new LogicalDagGenerator(actions, jobConfig, idGenerator, 
isStartWithSavePoint);
     }
 
-    protected abstract LogicalDag getLogicalDag();
+    public abstract LogicalDag getLogicalDag();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 3f4ce77c65..40a6640c35 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -38,6 +38,7 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -46,6 +47,7 @@ import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
@@ -69,7 +71,9 @@ import lombok.extern.slf4j.Slf4j;
 import scala.Tuple2;
 
 import java.io.Serializable;
+import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -167,6 +171,7 @@ public class MultipleTableJobConfigParser {
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService 
classLoaderService) {
+        this.fillJobConfigAndCommonJars();
         List<? extends Config> sourceConfigs =
                 TypesafeConfigUtils.getConfigList(
                         seaTunnelJobConfig, "source", Collections.emptyList());
@@ -194,7 +199,6 @@ public class MultipleTableJobConfigParser {
         try {
             Thread.currentThread().setContextClassLoader(classLoader);
             ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, 
sinkConfigs);
-            this.fillJobConfig();
             LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap =
                     new LinkedHashMap<>();
 
@@ -269,7 +273,7 @@ public class MultipleTableJobConfigParser {
                 });
     }
 
-    private void fillJobConfig() {
+    private void fillJobConfigAndCommonJars() {
         
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
         if (StringUtils.isEmpty(jobConfig.getName())
                 || jobConfig.getName().equals(Constants.LOGO)
@@ -277,6 +281,26 @@ public class MultipleTableJobConfigParser {
             jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
         }
         jobConfig.getEnvOptions().putAll(envOptions.getSourceMap());
+        this.commonPluginJars.addAll(
+                new ArrayList<>(
+                        Common.getThirdPartyJars(
+                                        jobConfig
+                                                .getEnvOptions()
+                                                
.getOrDefault(EnvCommonOptions.JARS.key(), "")
+                                                .toString())
+                                .stream()
+                                .map(Path::toUri)
+                                .map(
+                                        uri -> {
+                                            try {
+                                                return uri.toURL();
+                                            } catch (MalformedURLException e) {
+                                                throw new 
SeaTunnelEngineException(
+                                                        "the uri of jar 
illegal:" + uri, e);
+                                            }
+                                        })
+                                .collect(Collectors.toList())));
+        log.info("add common jar in plugins :{}", commonPluginJars);
     }
 
     private static <T extends Factory> boolean isFallback(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
index d13f1a49d8..e7a1c5108a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.engine.server.SeaTunnelServer;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 
@@ -77,8 +78,9 @@ public class RestJobExecutionEnvironment extends 
AbstractJobEnvironment {
         return jobId;
     }
 
+    @VisibleForTesting
     @Override
-    protected LogicalDag getLogicalDag() {
+    public LogicalDag getLogicalDag() {
         ImmutablePair<List<Action>, Set<URL>> immutablePair =
                 
getJobConfigParser().parse(seaTunnelServer.getClassLoaderService());
         actions.addAll(immutablePair.getLeft());

Reply via email to