This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a65cbfe56b [Feature][Zeta] Report non-terminal job states (#10133)
a65cbfe56b is described below

commit a65cbfe56b2c53b11e574f9fefabd0f34bcdd3c0
Author: Doyeon Kim <[email protected]>
AuthorDate: Sat Jun 6 00:18:25 2026 +0900

    [Feature][Zeta] Report non-terminal job states (#10133)
---
 docs/en/engines/event-listener.md                  |  25 ++++
 docs/zh/engines/event-listener.md                  |  27 ++++
 .../engine/common/config/EngineConfig.java         |   5 +
 .../config/YamlSeaTunnelDomConfigProcessor.java    |   9 ++
 .../common/config/server/ServerConfigOptions.java  |   1 +
 .../engine/server/CoordinatorService.java          |   4 +
 .../engine/server/dag/physical/PhysicalPlan.java   |  25 +++-
 .../engine/server/event/JobStateEventTest.java     | 164 +++++++++++++++++++--
 8 files changed, 244 insertions(+), 16 deletions(-)

diff --git a/docs/en/engines/event-listener.md 
b/docs/en/engines/event-listener.md
index b567b8152c..686a931f56 100644
--- a/docs/en/engines/event-listener.md
+++ b/docs/en/engines/event-listener.md
@@ -118,8 +118,33 @@ seatunnel:
       url: "http://example.com:1024/event/report";
       headers:
         Content-Type: application/json
+      report-non-terminal-job-state: false
 ```
 
+#### report-non-terminal-job-state
+
+- Type: boolean
+
+- Default: false
+
+Whether to report non-terminal job state events to the configured HTTP 
endpoint.
+
+When set to `true`, the engine will report job state change events for 
non-terminal states, including:
+
+- `PENDING`
+
+- `SCHEDULED`
+
+- `RUNNING`
+
+- `FAILING`
+
+- `CANCELING`
+
+- `DOING_SAVEPOINT`
+
+When set to `false`, only terminal job states (such as `FINISHED`, `FAILED`, 
`CANCELED`, `SAVEPOINT_DONE`) will be reported, and transitions into 
non-terminal states will be ignored.
+
 ### Flink Engine
 
 You can define the implementation class of 
`org.apache.seatunnel.api.event.EventHandler` interface and add to the 
classpath to automatically load it through SPI.
diff --git a/docs/zh/engines/event-listener.md 
b/docs/zh/engines/event-listener.md
index 7f33736ab0..d5bf27b7cb 100644
--- a/docs/zh/engines/event-listener.md
+++ b/docs/zh/engines/event-listener.md
@@ -116,8 +116,35 @@ seatunnel:
       url: "http://example.com:1024/event/report";
       headers:
         Content-Type: application/json
+      report-non-terminal-job-state: false
 ```
 
+#### report-non-terminal-job-state
+
+- 类型:boolean
+
+- 默认值:false
+
+是否将非终态(Non-terminal)作业状态的状态变更事件上报到配置的 HTTP 接口。
+
+当该配置为 `true` 时,Engine 会上报以下非终态作业状态的变更事件:
+
+- `PENDING`
+
+- `SCHEDULED`
+
+- `RUNNING`
+
+- `FAILING`
+
+- `CANCELING`
+
+- `DOING_SAVEPOINT`
+
+当该配置为 `false` 时,仅会上报终态(Terminal)作业状态(例如 
`FINISHED`,`FAILED`,`CANCELED`,`SAVEPOINT_DONE`),非终态状态的变更将被忽略。
+
+该配置项适用于仅关注作业最终结果、希望减少事件上报数量的场景。
+
 ### Flink 引擎
 
 您可以定义 `org.apache.seatunnel.api.event.EventHandler` 接口并添加到类路径,SPI会自动加载。
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index c92afdf6df..f7beb90d08 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -93,6 +93,7 @@ public class EngineConfig {
 
     private String eventReportHttpApi;
     private Map<String, String> eventReportHttpHeaders = 
Collections.emptyMap();
+    private boolean reportNonTerminalJobState = false;
 
     private ExecutionMode mode = ExecutionMode.CLUSTER;
 
@@ -171,6 +172,10 @@ public class EngineConfig {
         this.jobMetricsPartitionCount = jobMetricsPartitionCount;
     }
 
+    public void setReportNonTerminalJobState(boolean 
reportNonTerminalJobState) {
+        this.reportNonTerminalJobState = reportNonTerminalJobState;
+    }
+
     public void setTaskExecutionThreadShareMode(ThreadShareMode 
taskExecutionThreadShareMode) {
         checkNotNull(queueType);
         this.taskExecutionThreadShareMode = taskExecutionThreadShareMode;
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index c7274e6598..c71901263d 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -301,6 +301,15 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
                         }
                         engineConfig.setEventReportHttpHeaders(headers);
                     }
+
+                    Node reportNonTerminalJobStateNode =
+                            attributes.getNamedItem(
+                                    
ServerConfigOptions.MasterServerConfigOptions
+                                            .REPORT_NON_TERMINAL_JOB_STATE);
+                    if (reportNonTerminalJobStateNode != null) {
+                        engineConfig.setReportNonTerminalJobState(
+                                
getBooleanValue(getTextContent(reportNonTerminalJobStateNode)));
+                    }
                 }
             } else if (ServerConfigOptions.TELEMETRY.key().equals(name)) {
                 engineConfig.setTelemetryConfig(parseTelemetryConfig(node));
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 7bc5b3c06e..a9d145daf9 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -374,6 +374,7 @@ public class ServerConfigOptions {
         public static final String EVENT_REPORT_HTTP = "event-report-http";
         public static final String EVENT_REPORT_HTTP_URL = "url";
         public static final String EVENT_REPORT_HTTP_HEADERS = "headers";
+        public static final String REPORT_NON_TERMINAL_JOB_STATE = 
"report-non-terminal-job-state";
 
         // The options for http server end
         /////////////////////////////////////////////////////
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index ed8f3f68dd..71b232d82d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -1876,6 +1876,10 @@ public class CoordinatorService {
         return pendingJobQueue.getJobIdMap().size();
     }
 
+    public EngineConfig getEngineConfig() {
+        return engineConfig;
+    }
+
     @VisibleForTesting
     protected IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> 
getMetricsImap() {
         return metricsImap;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index a0d2f36a8e..1c2bb5936b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.job.JobResult;
 import org.apache.seatunnel.engine.common.job.JobStateEvent;
@@ -80,6 +81,8 @@ public class PhysicalPlan {
 
     private JobMaster jobMaster;
 
+    private EngineConfig engineConfig;
+
     private Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures =
             new HashMap<>();
 
@@ -133,6 +136,7 @@ public class PhysicalPlan {
     public void setJobMaster(JobMaster jobMaster) {
         this.jobMaster = jobMaster;
         pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
+        this.engineConfig = jobMaster.getEngineConfig();
     }
 
     public PassiveCompletableFuture<JobResult> initStateFuture() {
@@ -314,6 +318,8 @@ public class PhysicalPlan {
             // Now do the actual state transition, we must update 
runningJobStateTimestampsIMap
             // first and then can update runningJobStateIMap
             updateStateInfo(current, targetState);
+            reportJobStateEvent(targetState);
+
             stateProcess();
         } catch (Exception e) {
             log.error(ExceptionUtils.getMessage(e));
@@ -388,17 +394,28 @@ public class PhysicalPlan {
             case FINISHED:
                 stopJobStateProcess();
                 jobEndFuture.complete(new JobResult(jobStatus, 
errorBySubPlan.get()));
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown Job State: " + 
jobStatus);
+        }
+    }
+
+    private void reportJobStateEvent(JobStatus jobStatus) {
+        try {
+            if (jobStatus.isEndState()
+                    || (this.engineConfig != null
+                            && 
this.engineConfig.isReportNonTerminalJobState())) {
                 jobMaster
                         .getCoordinatorService()
                         .getEventProcessor()
                         .process(
                                 new JobStateEvent(
-                                        jobImmutableInformation.getJobId(),
+                                        jobId,
                                         
jobImmutableInformation.getJobConfig().getName(),
                                         jobStatus));
-                return;
-            default:
-                throw new IllegalArgumentException("Unknown Job State: " + 
jobStatus);
+            }
+        } catch (Exception e) {
+            log.warn("Failed to report job {} state event", jobId, e);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
index 000f2c8d63..d028b8e578 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -35,10 +36,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import static 
org.apache.seatunnel.engine.server.checkpoint.CheckpointErrorRestoreEndTest.STREAM_CONF_WITH_ERROR_PATH;
 import static org.awaitility.Awaitility.await;
 
-public class JobStateEventTest extends AbstractSeaTunnelServerTest {
+class JobStateEventTest extends AbstractSeaTunnelServerTest {
 
     @Test
-    public void testJobStateEvent() throws InterruptedException {
+    void testJobStateEvent() {
 
         JobEventProcessor eventProcessor =
                 (JobEventProcessor) 
server.getCoordinatorService().getEventProcessor();
@@ -68,40 +69,179 @@ public class JobStateEventTest extends 
AbstractSeaTunnelServerTest {
         List<EventHandler> handlers =
                 (List<EventHandler>) ReflectionUtils.getField(eventProcessor, 
"handlers").get();
         handlers.add(eventHandler);
-        long jobId_finished = System.currentTimeMillis();
+        long jobIdFinished = System.currentTimeMillis();
         long currentTimeMillis = System.currentTimeMillis();
-        startJob(jobId_finished, "fake_to_console.conf", false);
+        startJob(jobIdFinished, "fake_to_console.conf", false);
         await().atMost(60, TimeUnit.SECONDS)
                 .untilAsserted(
                         () ->
                                 Assertions.assertEquals(
                                         JobStatus.FINISHED,
                                         server.getCoordinatorService()
-                                                
.getJobStatus(jobId_finished)));
+                                                .getJobStatus(jobIdFinished)));
         // check whether the event handler is executed
         await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> Assertions.assertEquals(1, 
accessCounter.get()));
         JobStateEvent jobStateEventFinished = jobStateEventReference.get();
-        Assertions.assertEquals(String.valueOf(jobId_finished), 
jobStateEventFinished.getJobId());
+        Assertions.assertEquals(String.valueOf(jobIdFinished), 
jobStateEventFinished.getJobId());
         Assertions.assertEquals(JobStatus.FINISHED, 
jobStateEventFinished.getJobStatus());
         Assertions.assertTrue(jobStateEventFinished.getCreatedTime() > 
currentTimeMillis);
-        Assertions.assertEquals(String.valueOf(jobId_finished), 
jobStateEventFinished.getJobName());
+        Assertions.assertEquals(String.valueOf(jobIdFinished), 
jobStateEventFinished.getJobName());
 
-        long jobId_failed = System.currentTimeMillis();
-        startJob(jobId_failed, STREAM_CONF_WITH_ERROR_PATH, false);
+        long jobIdFailed = System.currentTimeMillis();
+        startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
         await().atMost(60, TimeUnit.SECONDS)
                 .untilAsserted(
                         () ->
                                 Assertions.assertEquals(
                                         JobStatus.FAILED,
-                                        
server.getCoordinatorService().getJobStatus(jobId_failed)));
+                                        
server.getCoordinatorService().getJobStatus(jobIdFailed)));
 
         await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> Assertions.assertEquals(2, 
accessCounter.get()));
         JobStateEvent jobStateEventFailed = jobStateEventReference.get();
-        Assertions.assertEquals(String.valueOf(jobId_failed), 
jobStateEventFailed.getJobId());
+        Assertions.assertEquals(String.valueOf(jobIdFailed), 
jobStateEventFailed.getJobId());
         Assertions.assertEquals(JobStatus.FAILED, 
jobStateEventFailed.getJobStatus());
         Assertions.assertTrue(jobStateEventFailed.getCreatedTime() > 
currentTimeMillis);
-        Assertions.assertEquals(String.valueOf(jobId_failed), 
jobStateEventFailed.getJobName());
+        Assertions.assertEquals(String.valueOf(jobIdFailed), 
jobStateEventFailed.getJobName());
+    }
+
+    @Test
+    void testNotEndJobStateEvent() {
+        
server.getCoordinatorService().getEngineConfig().setReportNonTerminalJobState(true);
+
+        JobEventProcessor eventProcessor =
+                (JobEventProcessor) 
server.getCoordinatorService().getEventProcessor();
+
+        AtomicInteger accessCounter = new AtomicInteger(0);
+        AtomicReference<JobStateEvent> jobStateEventReference = new 
AtomicReference<>();
+        EventHandler eventHandler =
+                event -> {
+                    if (event.getEventType() != EventType.JOB_STATUS) {
+                        return;
+                    }
+                    JobStateEvent jobStateEvent = (JobStateEvent) event;
+                    JobStatus status = jobStateEvent.getJobStatus();
+                    switch (status) {
+                        case PENDING:
+                        case SCHEDULED:
+                        case RUNNING:
+                        case DOING_SAVEPOINT:
+                        case FAILING:
+                        case CANCELING:
+                            accessCounter.incrementAndGet();
+                            jobStateEventReference.lazySet(jobStateEvent);
+                            break;
+                        default:
+                            break;
+                    }
+                };
+        // register the event handler
+        List<EventHandler> handlers =
+                (List<EventHandler>) ReflectionUtils.getField(eventProcessor, 
"handlers").get();
+        handlers.add(eventHandler);
+        long jobIdFinished = System.currentTimeMillis();
+        long currentTimeMillis = System.currentTimeMillis();
+        startJob(jobIdFinished, "fake_to_console.conf", false);
+        await().atMost(60, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED,
+                                        server.getCoordinatorService()
+                                                .getJobStatus(jobIdFinished)));
+        // check whether the event handler is executed
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(3, 
accessCounter.get()));
+        JobStateEvent jobStateEventFinished = jobStateEventReference.get();
+        Assertions.assertEquals(String.valueOf(jobIdFinished), 
jobStateEventFinished.getJobId());
+        Assertions.assertEquals(JobStatus.RUNNING, 
jobStateEventFinished.getJobStatus());
+        Assertions.assertTrue(jobStateEventFinished.getCreatedTime() > 
currentTimeMillis);
+        Assertions.assertEquals(String.valueOf(jobIdFinished), 
jobStateEventFinished.getJobName());
+
+        long jobIdFailed = System.currentTimeMillis();
+        startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
+        await().atMost(60, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FAILED,
+                                        
server.getCoordinatorService().getJobStatus(jobIdFailed)));
+
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(7, 
accessCounter.get()));
+        JobStateEvent jobStateEventFailed = jobStateEventReference.get();
+        Assertions.assertEquals(String.valueOf(jobIdFailed), 
jobStateEventFailed.getJobId());
+        Assertions.assertEquals(JobStatus.FAILING, 
jobStateEventFailed.getJobStatus());
+        Assertions.assertTrue(jobStateEventFailed.getCreatedTime() > 
currentTimeMillis);
+        Assertions.assertEquals(String.valueOf(jobIdFailed), 
jobStateEventFailed.getJobName());
+    }
+
+    @Test
+    void testJobStateEventOrderWhenReportNonTerminalJobStateEnabled() {
+        
server.getCoordinatorService().getEngineConfig().setReportNonTerminalJobState(true);
+
+        JobEventProcessor eventProcessor =
+                (JobEventProcessor) 
server.getCoordinatorService().getEventProcessor();
+
+        List<JobStatus> reportedStatuses = new CopyOnWriteArrayList<>();
+        AtomicReference<JobStateEvent> lastJobStateEventReference = new 
AtomicReference<>();
+
+        EventHandler eventHandler =
+                event -> {
+                    if (event.getEventType() != EventType.JOB_STATUS) {
+                        return;
+                    }
+
+                    JobStateEvent jobStateEvent = (JobStateEvent) event;
+                    reportedStatuses.add(jobStateEvent.getJobStatus());
+                    lastJobStateEventReference.lazySet(jobStateEvent);
+                };
+
+        List handlers = (List) ReflectionUtils.getField(eventProcessor, 
"handlers").get();
+        handlers.add(eventHandler);
+
+        long jobId = System.currentTimeMillis();
+        long currentTimeMillis = System.currentTimeMillis();
+
+        startJob(jobId, "fake_to_console.conf", false);
+
+        await().atMost(60, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED,
+                                        
server.getCoordinatorService().getJobStatus(jobId)));
+
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertTrue(
+                                        
reportedStatuses.contains(JobStatus.FINISHED),
+                                        "FINISHED event should be reported"));
+
+        Assertions.assertTrue(
+                reportedStatuses.contains(JobStatus.SCHEDULED),
+                "SCHEDULED event should be reported");
+        Assertions.assertTrue(
+                reportedStatuses.contains(JobStatus.RUNNING), "RUNNING event 
should be reported");
+        Assertions.assertTrue(
+                reportedStatuses.contains(JobStatus.FINISHED), "FINISHED event 
should be reported");
+
+        Assertions.assertTrue(
+                reportedStatuses.indexOf(JobStatus.SCHEDULED)
+                        < reportedStatuses.indexOf(JobStatus.RUNNING),
+                "SCHEDULED should be reported before RUNNING. Actual events: " 
+ reportedStatuses);
+
+        Assertions.assertTrue(
+                reportedStatuses.indexOf(JobStatus.RUNNING)
+                        < reportedStatuses.indexOf(JobStatus.FINISHED),
+                "RUNNING should be reported before FINISHED. Actual events: " 
+ reportedStatuses);
+
+        JobStateEvent lastJobStateEvent = lastJobStateEventReference.get();
+        Assertions.assertEquals(String.valueOf(jobId), 
lastJobStateEvent.getJobId());
+        Assertions.assertEquals(JobStatus.FINISHED, 
lastJobStateEvent.getJobStatus());
+        Assertions.assertTrue(lastJobStateEvent.getCreatedTime() > 
currentTimeMillis);
+        Assertions.assertEquals(String.valueOf(jobId), 
lastJobStateEvent.getJobName());
     }
 }

Reply via email to