This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1df6b8385d [Feature][Zeta]Add jar path precheck when job submit on 
master (#7976)
1df6b8385d is described below

commit 1df6b8385d494b3fd5306d682e50993503058ff2
Author: Jast <[email protected]>
AuthorDate: Thu Nov 7 22:11:53 2024 +0800

    [Feature][Zeta]Add jar path precheck when job submit on master (#7976)
---
 .../src/test/resources/jdbc_starrocks_dialect.conf |  1 -
 .../resources/jdbc_starrocks_source_to_sink.conf   |  1 -
 .../common/exception/ClassLoaderErrorCode.java     | 41 ++++++++++++++++++
 .../common/exception/ClassLoaderException.java     | 32 ++++++++++++++
 .../classloader/DefaultClassLoaderService.java     | 30 ++++++++++++-
 .../AbstractClassLoaderServiceTest.java            |  2 +-
 .../core/classloader/ClassLoaderServiceTest.java   | 49 ++++++++++++++++++++++
 .../seatunnel/engine/server/SeaTunnelServer.java   |  2 +-
 .../engine/server/TaskExecutionServiceTest.java    | 24 +++++++----
 9 files changed, 169 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
index 69fe5538f5..2e07ab3251 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
@@ -18,7 +18,6 @@
 env {
   parallelism = 1
   job.mode = "BATCH"
-  jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
 }
 
 source {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
index d9d4cf1811..6d5447ffb4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
@@ -18,7 +18,6 @@
 env {
   parallelism = 1
   job.mode = "BATCH"
-  jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
 }
 
 source {
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderErrorCode.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderErrorCode.java
new file mode 100644
index 0000000000..727523278c
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderErrorCode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.common.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum ClassLoaderErrorCode implements SeaTunnelErrorCode {
+    NOT_FOUND_JAR("NOT-FOUND-JAR", "Jar package not found");
+
+    private final String code;
+    private final String description;
+
+    ClassLoaderErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderException.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderException.java
new file mode 100644
index 0000000000..b0237a5227
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.exception;
+
+import org.apache.seatunnel.common.exception.ExceptionParamsUtil;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+import java.util.HashMap;
+
+public class ClassLoaderException extends SeaTunnelEngineException {
+
+    public ClassLoaderException(SeaTunnelErrorCode seaTunnelErrorCode, String 
errorMessage) {
+        super(seaTunnelErrorCode.getErrorMessage() + " - " + errorMessage);
+        ExceptionParamsUtil.assertParamsMatchWithDescription(
+                seaTunnelErrorCode.getDescription(), new HashMap<>());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
index c562829006..6be78c7122 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
@@ -17,14 +17,21 @@
 
 package org.apache.seatunnel.engine.core.classloader;
 
+import org.apache.seatunnel.engine.common.exception.ClassLoaderErrorCode;
+import org.apache.seatunnel.engine.common.exception.ClassLoaderException;
 import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.File;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -34,14 +41,17 @@ public class DefaultClassLoaderService implements 
ClassLoaderService {
     private final boolean cacheMode;
     private final Map<Long, Map<String, ClassLoader>> classLoaderCache;
     private final Map<Long, Map<String, AtomicInteger>> 
classLoaderReferenceCount;
+    private final NodeEngine nodeEngine;
 
-    public DefaultClassLoaderService(boolean cacheMode) {
+    public DefaultClassLoaderService(boolean cacheMode, NodeEngine nodeEngine) 
{
         this.cacheMode = cacheMode;
+        this.nodeEngine = nodeEngine;
         classLoaderCache = new ConcurrentHashMap<>();
         classLoaderReferenceCount = new ConcurrentHashMap<>();
         log.info("start classloader service" + (cacheMode ? " with cache mode" 
: ""));
     }
 
+    @SneakyThrows
     @Override
     public synchronized ClassLoader getClassLoader(long jobId, Collection<URL> 
jars) {
         log.debug("Get classloader for job {} with jars {}", jobId, jars);
@@ -59,6 +69,24 @@ public class DefaultClassLoaderService implements 
ClassLoaderService {
             classLoaderReferenceCount.get(jobId).get(key).incrementAndGet();
             return classLoaderMap.get(key);
         } else {
+            if (Objects.nonNull(nodeEngine)) {
+                for (URL jar : jars) {
+                    File file = new File(jar.toURI().getPath());
+                    if (!file.exists()) {
+                        String host =
+                                ((NodeEngineImpl) 
nodeEngine).getNode().getThisAddress().getHost();
+                        throw new ClassLoaderException(
+                                ClassLoaderErrorCode.NOT_FOUND_JAR,
+                                "The jar file "
+                                        + jar
+                                        + " can not be found in node "
+                                        + host
+                                        + ", please ensure that the deployment 
paths of SeaTunnel on different nodes are consistent.");
+                    }
+                }
+            } else {
+                log.debug("Run the test class without file checking");
+            }
             ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(jars);
             log.info("Create classloader for job {} with jars {}", jobId, 
jars);
             classLoaderMap.put(key, classLoader);
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
index 779ab63c5a..b7ee590967 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
@@ -38,7 +38,7 @@ public abstract class AbstractClassLoaderServiceTest {
 
     @BeforeEach
     void setUp() {
-        classLoaderService = new DefaultClassLoaderService(cacheMode());
+        classLoaderService = new DefaultClassLoaderService(cacheMode(), null);
     }
 
     @Test
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
index 00a2445e58..716b95a96a 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
@@ -17,11 +17,19 @@
 
 package org.apache.seatunnel.engine.core.classloader;
 
+import org.apache.seatunnel.engine.common.exception.ClassLoaderException;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.spi.impl.NodeEngineImpl;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 
@@ -100,4 +108,45 @@ public class ClassLoaderServiceTest extends 
AbstractClassLoaderServiceTest {
         Thread.sleep(2000);
         Assertions.assertFalse(thread.isAlive());
     }
+
+    @Test
+    void testPreCheckJar() throws IOException {
+
+        // Mocking Node and NodeEngineImpl for testing
+        Node mockNode = Mockito.mock(Node.class);
+        Mockito.when(mockNode.getThisAddress()).thenReturn(new 
Address("localhost", 5801));
+        NodeEngineImpl mockNodeEngine = Mockito.mock(NodeEngineImpl.class);
+        Mockito.when(mockNodeEngine.getNode()).thenReturn(mockNode);
+        // Creating DefaultClassLoaderService object for testing
+        DefaultClassLoaderService defaultClassLoaderService =
+                new DefaultClassLoaderService(cacheMode(), mockNodeEngine);
+        // Test case to check ClassLoaderException when file is not found
+        Assertions.assertThrows(
+                ClassLoaderException.class,
+                () -> {
+                    try {
+                        defaultClassLoaderService.getClassLoader(
+                                3L, Lists.newArrayList(new 
URL("file:/fake.jar")));
+                    } catch (ClassLoaderException e) {
+                        Assertions.assertTrue(
+                                e.getMessage()
+                                        .contains(
+                                                "The jar file file:/fake.jar 
can not be found in node localhost, please ensure that the deployment paths of 
SeaTunnel on different nodes are consistent."));
+                        throw e;
+                    }
+                });
+
+        // Creating a temporary jar file for testing
+        File tempJar = File.createTempFile("console", ".jar");
+        String tempJarPath = tempJar.toURI().toURL().toString();
+
+        // Test case to check successful class loader creation with existing 
jar file
+        Assertions.assertDoesNotThrow(
+                () ->
+                        defaultClassLoaderService.getClassLoader(
+                                3L, Lists.newArrayList(new URL(tempJarPath))));
+
+        // Deleting the temporary jar file after test
+        tempJar.delete();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 5721a7db30..e639681e5c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -122,7 +122,7 @@ public class SeaTunnelServer
 
         classLoaderService =
                 new DefaultClassLoaderService(
-                        
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode());
+                        
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode(), nodeEngine);
 
         eventService = new EventService(nodeEngine);
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 6b701dd38b..e06762996a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -43,8 +43,8 @@ import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
 import lombok.NonNull;
 
+import java.io.File;
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
@@ -167,7 +167,12 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     }
 
     @Test
-    public void testClassloaderSplit() throws MalformedURLException {
+    public void testClassloaderSplit() throws IOException {
+        File console = File.createTempFile("console", ".jar");
+        File fake = File.createTempFile("fake", ".jar");
+        String consoleFile = console.toURI().toURL().toString();
+        String fakeFile = fake.toURI().toURL().toString();
+
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
         long sleepTime = 300;
@@ -190,8 +195,8 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
                                 
nodeEngine.getSerializationService().toData(testTask1),
                                 
nodeEngine.getSerializationService().toData(testTask2)),
                         Arrays.asList(
-                                Collections.singleton(new 
URL("file://fake.jar")),
-                                Collections.singleton(new 
URL("file://console.jar"))),
+                                Collections.singleton(new URL(fakeFile)),
+                                Collections.singleton(new URL(consoleFile))),
                         Arrays.asList(emptySet(), emptySet()));
 
         Data data = 
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation);
@@ -203,24 +208,27 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
         TaskGroupContext taskGroupContext =
                 taskExecutionService.getActiveExecutionContext(location);
         Assertions.assertIterableEquals(
-                Collections.singleton(new URL("file://fake.jar")),
+                Collections.singleton(new URL(fakeFile)),
                 taskGroupContext.getJars().get(testTask1.getTaskID()));
         Assertions.assertIterableEquals(
-                Collections.singleton(new URL("file://console.jar")),
+                Collections.singleton(new URL(consoleFile)),
                 taskGroupContext.getJars().get(testTask2.getTaskID()));
 
         Assertions.assertIterableEquals(
-                Collections.singletonList(new URL("file://fake.jar")),
+                Collections.singletonList(new URL(fakeFile)),
                 Arrays.asList(
                         ((URLClassLoader) 
taskGroupContext.getClassLoader(testTask1.getTaskID()))
                                 .getURLs()));
         Assertions.assertIterableEquals(
-                Collections.singletonList(new URL("file://console.jar")),
+                Collections.singletonList(new URL(consoleFile)),
                 Arrays.asList(
                         ((URLClassLoader) 
taskGroupContext.getClassLoader(testTask2.getTaskID()))
                                 .getURLs()));
 
         taskExecutionService.cancelTaskGroup(location);
+
+        fake.delete();
+        console.delete();
     }
 
     /** Test task execution time is the same as the timer timeout */

Reply via email to