This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d023cf2a34 [Bug][Zeta] SeaTunnelClient can not exit with error (#9281)
d023cf2a34 is described below

commit d023cf2a349816297009de248cd49201992463b8
Author: CosmosNi <[email protected]>
AuthorDate: Sat May 17 09:43:19 2025 +0800

    [Bug][Zeta] SeaTunnelClient can not exit with error (#9281)
---
 pom.xml                                            | 20 ++++++
 .../apache/seatunnel/core/starter/SeaTunnel.java   |  1 +
 .../core/starter/seatunnel/SeaTunnelClient.java    | 12 +++-
 .../starter/seatunnel/SeaTunnelClientOOMTest.java  | 76 ++++++++++++++++++++++
 .../resources/config/fake_to_inmemory_oom.json     | 30 +++++++++
 .../main/resources/examples/fake_to_console.conf   |  2 +-
 6 files changed, 139 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6c72cdab20..777267cd58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,8 @@
         <maven.compiler.source>${java.version}</maven.compiler.source>
         <maven.compiler.target>${java.version}</maven.compiler.target>
 
+        <system-rules.version>1.2.1</system-rules.version>
+        <powermock.version>2.0.9</powermock.version>
         <slf4j.version>1.7.36</slf4j.version>
         <log4j2.version>2.17.1</log4j2.version>
         <log4j2-disruptor.version>3.4.4</log4j2-disruptor.version>
@@ -574,6 +576,24 @@
             <version>${mockito.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.stefanbirkner</groupId>
+            <artifactId>system-lambda</artifactId>
+            <version>${system-rules.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- The prometheus simpleclient -->
         <dependency>
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/SeaTunnel.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/SeaTunnel.java
index e003bf2d20..981f58689e 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/SeaTunnel.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/SeaTunnel.java
@@ -53,6 +53,7 @@ public class SeaTunnel {
         String errorMsg = throwable.getMessage();
         log.error("Config Error:\n");
         log.error("Reason: {} \n", errorMsg);
+        log.error("Exception StackTrace:{} ", 
ExceptionUtils.getStackTrace(throwable));
         log.error(
                 
"\n===============================================================================\n\n\n");
     }
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
index 308ef35905..f1abf36444 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
@@ -23,6 +23,11 @@ import 
org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class SeaTunnelClient {
     public static void main(String[] args) throws CommandException {
         ClientCommandArgs clientCommandArgs =
@@ -31,6 +36,11 @@ public class SeaTunnelClient {
                         new ClientCommandArgs(),
                         EngineType.SEATUNNEL.getStarterShellName(),
                         true);
-        SeaTunnel.run(clientCommandArgs.buildCommand());
+        try {
+            SeaTunnel.run(clientCommandArgs.buildCommand());
+        } catch (Error e) {
+            log.error("Exception StackTrace: {}", 
ExceptionUtils.getStackTrace(e));
+            System.exit(1);
+        }
     }
 }
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClientOOMTest.java
 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClientOOMTest.java
new file mode 100644
index 0000000000..89cbc0a229
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClientOOMTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel;
+
+import org.apache.seatunnel.core.starter.SeaTunnel;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static 
com.github.stefanbirkner.systemlambda.SystemLambda.catchSystemExit;
+
+@Slf4j
+public class SeaTunnelClientOOMTest {
+
+    @Test
+    public void testHazelcastOOMExitBehavior() throws Exception {
+        // Prepare command line arguments
+        String[] args = {"--config", "fake_config.conf"};
+        ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
+
+        // Mock CommandLineUtils.parse to return our clientCommandArgs
+        try (MockedStatic<CommandLineUtils> mockedCommandLineUtils =
+                Mockito.mockStatic(CommandLineUtils.class)) {
+            mockedCommandLineUtils
+                    .when(
+                            () ->
+                                    CommandLineUtils.parse(
+                                            Mockito.any(String[].class),
+                                            
Mockito.any(ClientCommandArgs.class),
+                                            Mockito.anyString(),
+                                            Mockito.anyBoolean()))
+                    .thenReturn(clientCommandArgs);
+
+            // Mock SeaTunnel.run to throw OutOfMemoryError
+            try (MockedStatic<SeaTunnel> mockedSeaTunnel = 
Mockito.mockStatic(SeaTunnel.class)) {
+                // Simulate Hazelcast thread allocation OOM
+                OutOfMemoryError oomError =
+                        new OutOfMemoryError("Java heap space during Hazelcast 
thread allocation");
+
+                // Mock run to throw OOM
+                mockedSeaTunnel.when(() -> 
SeaTunnel.run(Mockito.any())).thenThrow(oomError);
+
+                // Test that System.exit(1) is called
+                int statusCode =
+                        catchSystemExit(
+                                () -> {
+                                    SeaTunnelClient.main(args);
+                                });
+
+                // Verify exit code is 1
+                Assertions.assertEquals(1, statusCode);
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_oom.json
 
b/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_oom.json
new file mode 100644
index 0000000000..a85488a9d8
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_oom.json
@@ -0,0 +1,30 @@
+{
+  "env": {
+    "parallelism": 1,
+    "job.mode": "BATCH"
+  },
+  "source": [
+    {
+      "plugin_name": "FakeSource",
+      "plugin_output": "fake_oom_test",
+      "row.num": 100,
+      "split.num": 5,
+      "schema": {
+        "fields": {
+          "name": "string",
+          "age": "int"
+        }
+      },
+      "parallelism": 1
+    }
+  ],
+  "transform": [
+  ],
+  "sink": [
+    {
+      "plugin_name": "InMemory",
+      "plugin_input": "fake_oom_test",
+      "throw_out_of_memory": true
+    }
+  ]
+}
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index f265e8a70e..9eb8a7a375 100644
--- 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -59,4 +59,4 @@ sink {
   }
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
   # please go to https://seatunnel.apache.org/docs/connector-v2/sink
-}
+}
\ No newline at end of file

Reply via email to