This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fe0c477b74 [Hotfix][Zeta] Fix taskgroup failed log lost (#7241)
fe0c477b74 is described below
commit fe0c477b743e1d2b4c1dbc80b71e83b091306677
Author: Eric <[email protected]>
AuthorDate: Tue Jul 23 08:36:58 2024 +0800
[Hotfix][Zeta] Fix taskgroup failed log lost (#7241)
---
.../e2e/sink/inmemory/InMemorySinkFactory.java | 5 ++
.../e2e/sink/inmemory/InMemorySinkWriter.java | 8 +++
.../org/apache/seatunnel/engine/e2e/ClusterIT.java | 73 ++++++++++++++++++++++
.../stream_fake_to_inmemory_with_runtime_list.conf | 51 +++++++++++++++
.../engine/server/dag/physical/PhysicalVertex.java | 4 +-
5 files changed, 140 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 1c5b9fe398..9ba1956dbe 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import com.google.auto.service.AutoService;
+import java.util.List;
+
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@AutoService(Factory.class)
@@ -50,6 +52,9 @@ public class InMemorySinkFactory
public static final Option<String> ASSERT_OPTIONS_VALUE =
Options.key("assert_options_value").stringType().noDefaultValue();
+ public static final Option<List<String>> THROW_RUNTIME_EXCEPTION_LIST =
+
Options.key("throw_runtime_exception_list").listType().noDefaultValue();
+
@Override
public String factoryIdentifier() {
return "InMemory";
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
index a12b2ca5b9..81c8cf0af5 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
@@ -39,6 +39,8 @@ public class InMemorySinkWriter
// use a daemon thread to test classloader leak
private static final Thread THREAD;
+ private static int restoreCount = -1;
+
static {
// use the daemon thread to always hold the classloader
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
@@ -80,6 +82,12 @@ public class InMemorySinkWriter
if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) {
throw new OutOfMemoryError();
}
+
+ if
(config.getOptional(InMemorySinkFactory.THROW_RUNTIME_EXCEPTION_LIST).isPresent())
{
+ restoreCount++;
+ throw new RuntimeException(
+
config.get(InMemorySinkFactory.THROW_RUNTIME_EXCEPTION_LIST).get(restoreCount));
+ }
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
index ced1065731..76b4f6fc82 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -18,8 +18,14 @@
package org.apache.seatunnel.engine.e2e;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobResult;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
@@ -31,6 +37,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@@ -89,4 +96,70 @@ public class ClusterIT {
}
}
}
+
+ @Test
+ public void testTaskGroupErrorMsgLost() throws Exception {
+ HazelcastInstanceImpl node1 = null;
+ SeaTunnelClient engineClient = null;
+
+ String testClusterName = "Test_TaskGroupErrorMsgLost";
+
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig
+ .getHazelcastConfig()
+ .setClusterName(TestUtils.getClusterName(testClusterName));
+ seaTunnelConfig.getEngineConfig().setClassloaderCacheMode(true);
+
+ try {
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 1,
finalNode.getCluster().getMembers().size()));
+
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+
+ String filePath =
+
TestUtils.getResource("stream_fake_to_inmemory_with_runtime_list.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testClusterName);
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
seaTunnelConfig);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<PassiveCompletableFuture<JobResult>>
objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::doWaitForJobComplete);
+
+ Awaitility.await()
+ .atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Thread.sleep(2000);
+
Assertions.assertTrue(objectCompletableFuture.isDone());
+
+ PassiveCompletableFuture<JobResult>
+ jobResultPassiveCompletableFuture =
+ objectCompletableFuture.get();
+ JobResult jobResult =
jobResultPassiveCompletableFuture.get();
+ Assertions.assertEquals(JobStatus.FAILED,
jobResult.getStatus());
+ Assertions.assertTrue(
+ jobResult.getError().contains("runtime
error 4"));
+ });
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.close();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_runtime_list.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_runtime_list.conf
new file mode 100644
index 0000000000..b3a93adcc8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_runtime_list.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ InMemory {
+ source_table_name="fake"
+ throw_runtime_exception_list=["runtime error1", "runtime error 2",
"runtime error 3", "runtime error 4"]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 4fbcfa4fa3..b6ec234bf2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -214,7 +214,7 @@ public class PhysicalVertex {
}
} else if (ExecutionState.DEPLOYING.equals(currExecutionState)) {
if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
- updateTaskState(ExecutionState.RUNNING);
+ updateTaskState(ExecutionState.FAILING);
}
}
return new PassiveCompletableFuture<>(this.taskFuture);
@@ -485,6 +485,8 @@ public class PhysicalVertex {
() -> {
updateStateTimestamps(ExecutionState.CREATED);
runningJobStateIMap.set(taskGroupLocation,
ExecutionState.CREATED);
+ // reset the errorByPhysicalVertex
+ errorByPhysicalVertex = new AtomicReference<>();
return null;
},
new RetryUtils.RetryMaterial(