This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1df6b8385d [Feature][Zeta]Add jar path precheck when job submit on
master (#7976)
1df6b8385d is described below
commit 1df6b8385d494b3fd5306d682e50993503058ff2
Author: Jast <[email protected]>
AuthorDate: Thu Nov 7 22:11:53 2024 +0800
[Feature][Zeta]Add jar path precheck when job submit on master (#7976)
---
.../src/test/resources/jdbc_starrocks_dialect.conf | 1 -
.../resources/jdbc_starrocks_source_to_sink.conf | 1 -
.../common/exception/ClassLoaderErrorCode.java | 41 ++++++++++++++++++
.../common/exception/ClassLoaderException.java | 32 ++++++++++++++
.../classloader/DefaultClassLoaderService.java | 30 ++++++++++++-
.../AbstractClassLoaderServiceTest.java | 2 +-
.../core/classloader/ClassLoaderServiceTest.java | 49 ++++++++++++++++++++++
.../seatunnel/engine/server/SeaTunnelServer.java | 2 +-
.../engine/server/TaskExecutionServiceTest.java | 24 +++++++----
9 files changed, 169 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
index 69fe5538f5..2e07ab3251 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
@@ -18,7 +18,6 @@
env {
parallelism = 1
job.mode = "BATCH"
- jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
}
source {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
index d9d4cf1811..6d5447ffb4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_source_to_sink.conf
@@ -18,7 +18,6 @@
env {
parallelism = 1
job.mode = "BATCH"
- jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
}
source {
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderErrorCode.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderErrorCode.java
new file mode 100644
index 0000000000..727523278c
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderErrorCode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.common.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum ClassLoaderErrorCode implements SeaTunnelErrorCode {
+ NOT_FOUND_JAR("NOT-FOUND-JAR", "Jar package not found");
+
+ private final String code;
+ private final String description;
+
+ ClassLoaderErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderException.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderException.java
new file mode 100644
index 0000000000..b0237a5227
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/ClassLoaderException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.exception;
+
+import org.apache.seatunnel.common.exception.ExceptionParamsUtil;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+import java.util.HashMap;
+
+public class ClassLoaderException extends SeaTunnelEngineException {
+
+ public ClassLoaderException(SeaTunnelErrorCode seaTunnelErrorCode, String
errorMessage) {
+ super(seaTunnelErrorCode.getErrorMessage() + " - " + errorMessage);
+ ExceptionParamsUtil.assertParamsMatchWithDescription(
+ seaTunnelErrorCode.getDescription(), new HashMap<>());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
index c562829006..6be78c7122 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
@@ -17,14 +17,21 @@
package org.apache.seatunnel.engine.core.classloader;
+import org.apache.seatunnel.engine.common.exception.ClassLoaderErrorCode;
+import org.apache.seatunnel.engine.common.exception.ClassLoaderException;
import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import com.google.common.annotations.VisibleForTesting;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import java.io.File;
import java.net.URL;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,14 +41,17 @@ public class DefaultClassLoaderService implements
ClassLoaderService {
private final boolean cacheMode;
private final Map<Long, Map<String, ClassLoader>> classLoaderCache;
private final Map<Long, Map<String, AtomicInteger>>
classLoaderReferenceCount;
+ private final NodeEngine nodeEngine;
- public DefaultClassLoaderService(boolean cacheMode) {
+ public DefaultClassLoaderService(boolean cacheMode, NodeEngine nodeEngine)
{
this.cacheMode = cacheMode;
+ this.nodeEngine = nodeEngine;
classLoaderCache = new ConcurrentHashMap<>();
classLoaderReferenceCount = new ConcurrentHashMap<>();
log.info("start classloader service" + (cacheMode ? " with cache mode"
: ""));
}
+ @SneakyThrows
@Override
public synchronized ClassLoader getClassLoader(long jobId, Collection<URL>
jars) {
log.debug("Get classloader for job {} with jars {}", jobId, jars);
@@ -59,6 +69,24 @@ public class DefaultClassLoaderService implements
ClassLoaderService {
classLoaderReferenceCount.get(jobId).get(key).incrementAndGet();
return classLoaderMap.get(key);
} else {
+ if (Objects.nonNull(nodeEngine)) {
+ for (URL jar : jars) {
+ File file = new File(jar.toURI().getPath());
+ if (!file.exists()) {
+ String host =
+ ((NodeEngineImpl)
nodeEngine).getNode().getThisAddress().getHost();
+ throw new ClassLoaderException(
+ ClassLoaderErrorCode.NOT_FOUND_JAR,
+ "The jar file "
+ + jar
+ + " can not be found in node "
+ + host
+ + ", please ensure that the deployment
paths of SeaTunnel on different nodes are consistent.");
+ }
+ }
+ } else {
+ log.debug("Run the test class without file checking");
+ }
ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(jars);
log.info("Create classloader for job {} with jars {}", jobId,
jars);
classLoaderMap.put(key, classLoader);
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
index 779ab63c5a..b7ee590967 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/AbstractClassLoaderServiceTest.java
@@ -38,7 +38,7 @@ public abstract class AbstractClassLoaderServiceTest {
@BeforeEach
void setUp() {
- classLoaderService = new DefaultClassLoaderService(cacheMode());
+ classLoaderService = new DefaultClassLoaderService(cacheMode(), null);
}
@Test
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
index 00a2445e58..716b95a96a 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java
@@ -17,11 +17,19 @@
package org.apache.seatunnel.engine.core.classloader;
+import org.apache.seatunnel.engine.common.exception.ClassLoaderException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import com.google.common.collect.Lists;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import java.io.File;
+import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@@ -100,4 +108,45 @@ public class ClassLoaderServiceTest extends
AbstractClassLoaderServiceTest {
Thread.sleep(2000);
Assertions.assertFalse(thread.isAlive());
}
+
+ @Test
+ void testPreCheckJar() throws IOException {
+
+ // Mocking Node and NodeEngineImpl for testing
+ Node mockNode = Mockito.mock(Node.class);
+ Mockito.when(mockNode.getThisAddress()).thenReturn(new
Address("localhost", 5801));
+ NodeEngineImpl mockNodeEngine = Mockito.mock(NodeEngineImpl.class);
+ Mockito.when(mockNodeEngine.getNode()).thenReturn(mockNode);
+ // Creating DefaultClassLoaderService object for testing
+ DefaultClassLoaderService defaultClassLoaderService =
+ new DefaultClassLoaderService(cacheMode(), mockNodeEngine);
+ // Test case to check ClassLoaderException when file is not found
+ Assertions.assertThrows(
+ ClassLoaderException.class,
+ () -> {
+ try {
+ defaultClassLoaderService.getClassLoader(
+ 3L, Lists.newArrayList(new
URL("file:/fake.jar")));
+ } catch (ClassLoaderException e) {
+ Assertions.assertTrue(
+ e.getMessage()
+ .contains(
+ "The jar file file:/fake.jar
can not be found in node localhost, please ensure that the deployment paths of
SeaTunnel on different nodes are consistent."));
+ throw e;
+ }
+ });
+
+ // Creating a temporary jar file for testing
+ File tempJar = File.createTempFile("console", ".jar");
+ String tempJarPath = tempJar.toURI().toURL().toString();
+
+ // Test case to check successful class loader creation with existing
jar file
+ Assertions.assertDoesNotThrow(
+ () ->
+ defaultClassLoaderService.getClassLoader(
+ 3L, Lists.newArrayList(new URL(tempJarPath))));
+
+ // Deleting the temporary jar file after test
+ tempJar.delete();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 5721a7db30..e639681e5c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -122,7 +122,7 @@ public class SeaTunnelServer
classLoaderService =
new DefaultClassLoaderService(
-
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode());
+
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode(), nodeEngine);
eventService = new EventService(nodeEngine);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 6b701dd38b..e06762996a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -43,8 +43,8 @@ import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import lombok.NonNull;
+import java.io.File;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
@@ -167,7 +167,12 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
@Test
- public void testClassloaderSplit() throws MalformedURLException {
+ public void testClassloaderSplit() throws IOException {
+ File console = File.createTempFile("console", ".jar");
+ File fake = File.createTempFile("fake", ".jar");
+ String consoleFile = console.toURI().toURL().toString();
+ String fakeFile = fake.toURI().toURL().toString();
+
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
long sleepTime = 300;
@@ -190,8 +195,8 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
nodeEngine.getSerializationService().toData(testTask1),
nodeEngine.getSerializationService().toData(testTask2)),
Arrays.asList(
- Collections.singleton(new
URL("file://fake.jar")),
- Collections.singleton(new
URL("file://console.jar"))),
+ Collections.singleton(new URL(fakeFile)),
+ Collections.singleton(new URL(consoleFile))),
Arrays.asList(emptySet(), emptySet()));
Data data =
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation);
@@ -203,24 +208,27 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
TaskGroupContext taskGroupContext =
taskExecutionService.getActiveExecutionContext(location);
Assertions.assertIterableEquals(
- Collections.singleton(new URL("file://fake.jar")),
+ Collections.singleton(new URL(fakeFile)),
taskGroupContext.getJars().get(testTask1.getTaskID()));
Assertions.assertIterableEquals(
- Collections.singleton(new URL("file://console.jar")),
+ Collections.singleton(new URL(consoleFile)),
taskGroupContext.getJars().get(testTask2.getTaskID()));
Assertions.assertIterableEquals(
- Collections.singletonList(new URL("file://fake.jar")),
+ Collections.singletonList(new URL(fakeFile)),
Arrays.asList(
((URLClassLoader)
taskGroupContext.getClassLoader(testTask1.getTaskID()))
.getURLs()));
Assertions.assertIterableEquals(
- Collections.singletonList(new URL("file://console.jar")),
+ Collections.singletonList(new URL(consoleFile)),
Arrays.asList(
((URLClassLoader)
taskGroupContext.getClassLoader(testTask2.getTaskID()))
.getURLs()));
taskExecutionService.cancelTaskGroup(location);
+
+ fake.delete();
+ console.delete();
}
/** Test task execution time is the same as the timer timeout */