This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fe0c477b74 [Hotfix][Zeta] Fix taskgroup failed log lost (#7241)
fe0c477b74 is described below

commit fe0c477b743e1d2b4c1dbc80b71e83b091306677
Author: Eric <[email protected]>
AuthorDate: Tue Jul 23 08:36:58 2024 +0800

    [Hotfix][Zeta] Fix taskgroup failed log lost (#7241)
---
 .../e2e/sink/inmemory/InMemorySinkFactory.java     |  5 ++
 .../e2e/sink/inmemory/InMemorySinkWriter.java      |  8 +++
 .../org/apache/seatunnel/engine/e2e/ClusterIT.java | 73 ++++++++++++++++++++++
 .../stream_fake_to_inmemory_with_runtime_list.conf | 51 +++++++++++++++
 .../engine/server/dag/physical/PhysicalVertex.java |  4 +-
 5 files changed, 140 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 1c5b9fe398..9ba1956dbe 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import com.google.auto.service.AutoService;
 
+import java.util.List;
+
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
 
 @AutoService(Factory.class)
@@ -50,6 +52,9 @@ public class InMemorySinkFactory
     public static final Option<String> ASSERT_OPTIONS_VALUE =
             Options.key("assert_options_value").stringType().noDefaultValue();
 
+    public static final Option<List<String>> THROW_RUNTIME_EXCEPTION_LIST =
+            
Options.key("throw_runtime_exception_list").listType().noDefaultValue();
+
     @Override
     public String factoryIdentifier() {
         return "InMemory";
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
index a12b2ca5b9..81c8cf0af5 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
@@ -39,6 +39,8 @@ public class InMemorySinkWriter
     // use a daemon thread to test classloader leak
     private static final Thread THREAD;
 
+    private static int restoreCount = -1;
+
     static {
         // use the daemon thread to always hold the classloader
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
@@ -80,6 +82,12 @@ public class InMemorySinkWriter
         if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) {
             throw new OutOfMemoryError();
         }
+
+        if 
(config.getOptional(InMemorySinkFactory.THROW_RUNTIME_EXCEPTION_LIST).isPresent())
 {
+            restoreCount++;
+            throw new RuntimeException(
+                    
config.get(InMemorySinkFactory.THROW_RUNTIME_EXCEPTION_LIST).get(restoreCount));
+        }
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
index ced1065731..76b4f6fc82 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -18,8 +18,14 @@
 package org.apache.seatunnel.engine.e2e;
 
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobResult;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import org.awaitility.Awaitility;
@@ -31,6 +37,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -89,4 +96,70 @@ public class ClusterIT {
             }
         }
     }
+
+    @Test
+    public void testTaskGroupErrorMsgLost() throws Exception {
+        HazelcastInstanceImpl node1 = null;
+        SeaTunnelClient engineClient = null;
+
+        String testClusterName = "Test_TaskGroupErrorMsgLost";
+
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig
+                .getHazelcastConfig()
+                .setClusterName(TestUtils.getClusterName(testClusterName));
+        seaTunnelConfig.getEngineConfig().setClassloaderCacheMode(true);
+
+        try {
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await()
+                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            1, 
finalNode.getCluster().getMembers().size()));
+
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+
+            String filePath =
+                    
TestUtils.getResource("stream_fake_to_inmemory_with_runtime_list.conf");
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName(testClusterName);
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    engineClient.createExecutionContext(filePath, jobConfig, 
seaTunnelConfig);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            CompletableFuture<PassiveCompletableFuture<JobResult>> 
objectCompletableFuture =
+                    
CompletableFuture.supplyAsync(clientJobProxy::doWaitForJobComplete);
+
+            Awaitility.await()
+                    .atMost(120000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                Thread.sleep(2000);
+                                
Assertions.assertTrue(objectCompletableFuture.isDone());
+
+                                PassiveCompletableFuture<JobResult>
+                                        jobResultPassiveCompletableFuture =
+                                                objectCompletableFuture.get();
+                                JobResult jobResult = 
jobResultPassiveCompletableFuture.get();
+                                Assertions.assertEquals(JobStatus.FAILED, 
jobResult.getStatus());
+                                Assertions.assertTrue(
+                                        jobResult.getError().contains("runtime 
error 4"));
+                            });
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.close();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_runtime_list.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_runtime_list.conf
new file mode 100644
index 0000000000..b3a93adcc8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_runtime_list.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      row.num = 100
+      split.num = 5
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
+
+transform {
+}
+
+sink {
+  InMemory {
+    source_table_name="fake"
+    throw_runtime_exception_list=["runtime error1", "runtime error 2", 
"runtime error 3", "runtime error 4"]
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 4fbcfa4fa3..b6ec234bf2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -214,7 +214,7 @@ public class PhysicalVertex {
             }
         } else if (ExecutionState.DEPLOYING.equals(currExecutionState)) {
             if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
-                updateTaskState(ExecutionState.RUNNING);
+                updateTaskState(ExecutionState.FAILING);
             }
         }
         return new PassiveCompletableFuture<>(this.taskFuture);
@@ -485,6 +485,8 @@ public class PhysicalVertex {
                         () -> {
                             updateStateTimestamps(ExecutionState.CREATED);
                             runningJobStateIMap.set(taskGroupLocation, 
ExecutionState.CREATED);
+                            // reset the errorByPhysicalVertex
+                            errorByPhysicalVertex = new AtomicReference<>();
                             return null;
                         },
                         new RetryUtils.RetryMaterial(

Reply via email to