This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new af38fac149 [Improve][Zeta] Remove misleading exception log when job be
canceled (#8988)
af38fac149 is described below
commit af38fac149be4acabb2f63f4565b4442f6308b7d
Author: Jia Fan <[email protected]>
AuthorDate: Fri Mar 21 00:46:40 2025 +0800
[Improve][Zeta] Remove misleading exception log when job be canceled (#8988)
---
.../container/seatunnel/SeaTunnelContainer.java | 4 +-
.../e2e/sink/inmemory/InMemorySinkFactory.java | 4 ++
.../e2e/sink/inmemory/InMemorySinkWriter.java | 8 ++++
.../seatunnel/engine/e2e/JobClientJobProxyIT.java | 44 ++++++++++++++++++++
.../test/resources/seatunnel_fixed_slot_num.yaml | 3 ++
...kesource_to_inmemory_pending_row_in_queue.conf} | 48 ++++++++++++++--------
.../engine/server/TaskExecutionService.java | 6 ++-
7 files changed, 96 insertions(+), 21 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 254519a813..1be4cb4ca6 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -159,8 +159,8 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
"seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forLogMessage(".*received new worker
register:.*", 1));
copySeaTunnelStarterToContainer(server);
- server.setPortBindings(Collections.singletonList("5801:5801"));
- server.setExposedPorts(Collections.singletonList(5801));
+ server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
+ server.setExposedPorts(Arrays.asList(5801, 8080));
server.withCopyFileToContainer(
MountableFile.forHostPath(
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 9ba1956dbe..e8722bb74b 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -40,6 +40,9 @@ public class InMemorySinkFactory
public static final Option<Boolean> THROW_EXCEPTION =
Options.key("throw_exception").booleanType().defaultValue(false);
+ public static final Option<Boolean> WRITER_SLEEP =
+ Options.key("writer_sleep").booleanType().defaultValue(false);
+
public static final Option<Boolean> THROW_OUT_OF_MEMORY =
Options.key("throw_out_of_memory").booleanType().defaultValue(false);
public static final Option<Boolean> CHECKPOINT_SLEEP =
@@ -66,6 +69,7 @@ public class InMemorySinkFactory
.optional(
THROW_EXCEPTION,
THROW_OUT_OF_MEMORY,
+ WRITER_SLEEP,
CHECKPOINT_SLEEP,
THROW_EXCEPTION_OF_COMMITTER,
ASSERT_OPTIONS_KEY,
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
index 81c8cf0af5..b5dc29e41c 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
@@ -79,6 +79,14 @@ public class InMemorySinkWriter
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (config.get(InMemorySinkFactory.WRITER_SLEEP)) {
+ try {
+ Thread.sleep(999999999L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) {
throw new OutOfMemoryError();
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index b7f2ea53f8..aa4d959383 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -24,10 +24,16 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+@Slf4j
public class JobClientJobProxyIT extends SeaTunnelEngineContainer {
@Override
@@ -73,6 +79,44 @@ public class JobClientJobProxyIT extends
SeaTunnelEngineContainer {
server.getLogs().contains("wrong target release operation with
job"));
}
+ @Test
+ public void testNoExceptionLogWhenCancelJob() throws IOException,
InterruptedException {
+ String jobId = String.valueOf(System.currentTimeMillis());
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ executeJob(
+
"/stream_fakesource_to_inmemory_pending_row_in_queue.conf", jobId);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException();
+ }
+ });
+
+ given().pollDelay(10, TimeUnit.SECONDS)
+ .await()
+ .pollDelay(5000L, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals("RUNNING",
this.getJobStatus(jobId));
+ });
+
+ String logBeforeCancel = this.getServerLogs();
+ cancelJob(jobId);
+ given().pollDelay(10, TimeUnit.SECONDS)
+ .await()
+ .pollDelay(5000L, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals("CANCELED",
this.getJobStatus(jobId));
+ });
+ String logAfterCancel =
this.getServerLogs().substring(logBeforeCancel.length());
+ // in TaskExecutionService.BlockingWorker::run catch Throwable
+ Assertions.assertFalse(logAfterCancel.contains("Exception in"),
logAfterCancel);
+ Assertions.assertEquals(
+ 4, StringUtils.countMatches(logAfterCancel, "Interrupted
task"), logAfterCancel);
+ }
+
@Test
public void testMultiTableSinkFailedWithThrowable() throws IOException,
InterruptedException {
Container.ExecResult execResult =
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
index 4be4ec075c..146ffc0379 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
@@ -33,3 +33,6 @@ seatunnel:
max-retained: 3
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot/
+ http:
+ enable-http: true
+ port: 8080
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf
similarity index 59%
copy from
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf
index 4be4ec075c..df3d922937 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf
@@ -14,22 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
-seatunnel:
- engine:
- history-job-expire-minutes: 1
- classloader-cache-mode: false
- backup-count: 2
- queue-type: blockingqueue
- print-execution-info-interval: 10
- slot-service:
- dynamic-slot: false
- slot-num: 3
- checkpoint:
- interval: 300000
- timeout: 100000
- storage:
- type: localfile
- max-retained: 3
- plugin-config:
- namespace: /tmp/seatunnel/checkpoint_snapshot/
+env {
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ # More than TaskGroupWithIntermediateBlockingQueue::QUEUE_SIZE
+ row.num = 9999
+ parallelism = 1
+ schema = {
+ fields {
+ c_int = int
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ InMemory {
+ writer_sleep = true
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index d42e95d480..afd12cbc7b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -702,7 +702,11 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
taskGroupExecutionTracker.exception(e);
}
} catch (Throwable e) {
- logger.warning("Exception in " + t, e);
+ if (taskGroupExecutionTracker.isCancel.get()) {
+ logger.warning(String.format("Interrupted task %d - %s",
t.getTaskID(), t));
+ } else {
+ logger.warning("Exception in " + t, e);
+ }
taskGroupExecutionTracker.exception(e);
} finally {
taskGroupExecutionTracker.taskDone(t);