This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a65cbfe56b [Feature][Zeta] Report non-terminal job states (#10133)
a65cbfe56b is described below
commit a65cbfe56b2c53b11e574f9fefabd0f34bcdd3c0
Author: Doyeon Kim <[email protected]>
AuthorDate: Sat Jun 6 00:18:25 2026 +0900
[Feature][Zeta] Report non-terminal job states (#10133)
---
docs/en/engines/event-listener.md | 25 ++++
docs/zh/engines/event-listener.md | 27 ++++
.../engine/common/config/EngineConfig.java | 5 +
.../config/YamlSeaTunnelDomConfigProcessor.java | 9 ++
.../common/config/server/ServerConfigOptions.java | 1 +
.../engine/server/CoordinatorService.java | 4 +
.../engine/server/dag/physical/PhysicalPlan.java | 25 +++-
.../engine/server/event/JobStateEventTest.java | 164 +++++++++++++++++++--
8 files changed, 244 insertions(+), 16 deletions(-)
diff --git a/docs/en/engines/event-listener.md
b/docs/en/engines/event-listener.md
index b567b8152c..686a931f56 100644
--- a/docs/en/engines/event-listener.md
+++ b/docs/en/engines/event-listener.md
@@ -118,8 +118,33 @@ seatunnel:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
+ report-non-terminal-job-state: false
```
+#### report-non-terminal-job-state
+
+- Type: boolean
+
+- Default: false
+
+Whether to report non-terminal job state events to the configured HTTP
endpoint.
+
+When set to `true`, the engine will report job state change events for
non-terminal states, including:
+
+- `PENDING`
+
+- `SCHEDULED`
+
+- `RUNNING`
+
+- `FAILING`
+
+- `CANCELING`
+
+- `DOING_SAVEPOINT`
+
+When set to `false`, only terminal job states (such as `FINISHED`, `FAILED`,
`CANCELED`, `SAVEPOINT_DONE`) will be reported, and transitions into
non-terminal states will be ignored.
+
### Flink Engine
You can define the implementation class of
`org.apache.seatunnel.api.event.EventHandler` interface and add to the
classpath to automatically load it through SPI.
diff --git a/docs/zh/engines/event-listener.md
b/docs/zh/engines/event-listener.md
index 7f33736ab0..d5bf27b7cb 100644
--- a/docs/zh/engines/event-listener.md
+++ b/docs/zh/engines/event-listener.md
@@ -116,8 +116,35 @@ seatunnel:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
+ report-non-terminal-job-state: false
```
+#### report-non-terminal-job-state
+
+- 类型:boolean
+
+- 默认值:false
+
+是否将非终态(Non-terminal)作业状态的状态变更事件上报到配置的 HTTP 接口。
+
+当该配置为 `true` 时,Engine 会上报以下非终态作业状态的变更事件:
+
+- `PENDING`
+
+- `SCHEDULED`
+
+- `RUNNING`
+
+- `FAILING`
+
+- `CANCELING`
+
+- `DOING_SAVEPOINT`
+
+当该配置为 `false` 时,仅会上报终态(Terminal)作业状态(例如
`FINISHED`,`FAILED`,`CANCELED`,`SAVEPOINT_DONE`),非终态状态的变更将被忽略。
+
+该配置项适用于仅关注作业最终结果、希望减少事件上报数量的场景。
+
### Flink 引擎
您可以定义 `org.apache.seatunnel.api.event.EventHandler` 接口并添加到类路径,SPI会自动加载。
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index c92afdf6df..f7beb90d08 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -93,6 +93,7 @@ public class EngineConfig {
private String eventReportHttpApi;
private Map<String, String> eventReportHttpHeaders =
Collections.emptyMap();
+ private boolean reportNonTerminalJobState = false;
private ExecutionMode mode = ExecutionMode.CLUSTER;
@@ -171,6 +172,10 @@ public class EngineConfig {
this.jobMetricsPartitionCount = jobMetricsPartitionCount;
}
+ public void setReportNonTerminalJobState(boolean
reportNonTerminalJobState) {
+ this.reportNonTerminalJobState = reportNonTerminalJobState;
+ }
+
public void setTaskExecutionThreadShareMode(ThreadShareMode
taskExecutionThreadShareMode) {
checkNotNull(queueType);
this.taskExecutionThreadShareMode = taskExecutionThreadShareMode;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index c7274e6598..c71901263d 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -301,6 +301,15 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
}
engineConfig.setEventReportHttpHeaders(headers);
}
+
+ Node reportNonTerminalJobStateNode =
+ attributes.getNamedItem(
+
ServerConfigOptions.MasterServerConfigOptions
+ .REPORT_NON_TERMINAL_JOB_STATE);
+ if (reportNonTerminalJobStateNode != null) {
+ engineConfig.setReportNonTerminalJobState(
+
getBooleanValue(getTextContent(reportNonTerminalJobStateNode)));
+ }
}
} else if (ServerConfigOptions.TELEMETRY.key().equals(name)) {
engineConfig.setTelemetryConfig(parseTelemetryConfig(node));
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 7bc5b3c06e..a9d145daf9 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -374,6 +374,7 @@ public class ServerConfigOptions {
public static final String EVENT_REPORT_HTTP = "event-report-http";
public static final String EVENT_REPORT_HTTP_URL = "url";
public static final String EVENT_REPORT_HTTP_HEADERS = "headers";
+ public static final String REPORT_NON_TERMINAL_JOB_STATE =
"report-non-terminal-job-state";
// The options for http server end
/////////////////////////////////////////////////////
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index ed8f3f68dd..71b232d82d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -1876,6 +1876,10 @@ public class CoordinatorService {
return pendingJobQueue.getJobIdMap().size();
}
+ public EngineConfig getEngineConfig() {
+ return engineConfig;
+ }
+
@VisibleForTesting
protected IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>>
getMetricsImap() {
return metricsImap;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index a0d2f36a8e..1c2bb5936b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.job.JobResult;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
@@ -80,6 +81,8 @@ public class PhysicalPlan {
private JobMaster jobMaster;
+ private EngineConfig engineConfig;
+
private Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourceFutures =
new HashMap<>();
@@ -133,6 +136,7 @@ public class PhysicalPlan {
public void setJobMaster(JobMaster jobMaster) {
this.jobMaster = jobMaster;
pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
+ this.engineConfig = jobMaster.getEngineConfig();
}
public PassiveCompletableFuture<JobResult> initStateFuture() {
@@ -314,6 +318,8 @@ public class PhysicalPlan {
// Now do the actual state transition, we must update
runningJobStateTimestampsIMap
// first and then can update runningJobStateIMap
updateStateInfo(current, targetState);
+ reportJobStateEvent(targetState);
+
stateProcess();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
@@ -388,17 +394,28 @@ public class PhysicalPlan {
case FINISHED:
stopJobStateProcess();
jobEndFuture.complete(new JobResult(jobStatus,
errorBySubPlan.get()));
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Job State: " +
jobStatus);
+ }
+ }
+
+ private void reportJobStateEvent(JobStatus jobStatus) {
+ try {
+ if (jobStatus.isEndState()
+ || (this.engineConfig != null
+ &&
this.engineConfig.isReportNonTerminalJobState())) {
jobMaster
.getCoordinatorService()
.getEventProcessor()
.process(
new JobStateEvent(
- jobImmutableInformation.getJobId(),
+ jobId,
jobImmutableInformation.getJobConfig().getName(),
jobStatus));
- return;
- default:
- throw new IllegalArgumentException("Unknown Job State: " +
jobStatus);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to report job {} state event", jobId, e);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
index 000f2c8d63..d028b8e578 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/event/JobStateEventTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -35,10 +36,10 @@ import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.seatunnel.engine.server.checkpoint.CheckpointErrorRestoreEndTest.STREAM_CONF_WITH_ERROR_PATH;
import static org.awaitility.Awaitility.await;
-public class JobStateEventTest extends AbstractSeaTunnelServerTest {
+class JobStateEventTest extends AbstractSeaTunnelServerTest {
@Test
- public void testJobStateEvent() throws InterruptedException {
+ void testJobStateEvent() {
JobEventProcessor eventProcessor =
(JobEventProcessor)
server.getCoordinatorService().getEventProcessor();
@@ -68,40 +69,179 @@ public class JobStateEventTest extends
AbstractSeaTunnelServerTest {
List<EventHandler> handlers =
(List<EventHandler>) ReflectionUtils.getField(eventProcessor,
"handlers").get();
handlers.add(eventHandler);
- long jobId_finished = System.currentTimeMillis();
+ long jobIdFinished = System.currentTimeMillis();
long currentTimeMillis = System.currentTimeMillis();
- startJob(jobId_finished, "fake_to_console.conf", false);
+ startJob(jobIdFinished, "fake_to_console.conf", false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
server.getCoordinatorService()
-
.getJobStatus(jobId_finished)));
+ .getJobStatus(jobIdFinished)));
// check whether the event handler is executed
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(1,
accessCounter.get()));
JobStateEvent jobStateEventFinished = jobStateEventReference.get();
- Assertions.assertEquals(String.valueOf(jobId_finished),
jobStateEventFinished.getJobId());
+ Assertions.assertEquals(String.valueOf(jobIdFinished),
jobStateEventFinished.getJobId());
Assertions.assertEquals(JobStatus.FINISHED,
jobStateEventFinished.getJobStatus());
Assertions.assertTrue(jobStateEventFinished.getCreatedTime() >
currentTimeMillis);
- Assertions.assertEquals(String.valueOf(jobId_finished),
jobStateEventFinished.getJobName());
+ Assertions.assertEquals(String.valueOf(jobIdFinished),
jobStateEventFinished.getJobName());
- long jobId_failed = System.currentTimeMillis();
- startJob(jobId_failed, STREAM_CONF_WITH_ERROR_PATH, false);
+ long jobIdFailed = System.currentTimeMillis();
+ startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FAILED,
-
server.getCoordinatorService().getJobStatus(jobId_failed)));
+
server.getCoordinatorService().getJobStatus(jobIdFailed)));
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(2,
accessCounter.get()));
JobStateEvent jobStateEventFailed = jobStateEventReference.get();
- Assertions.assertEquals(String.valueOf(jobId_failed),
jobStateEventFailed.getJobId());
+ Assertions.assertEquals(String.valueOf(jobIdFailed),
jobStateEventFailed.getJobId());
Assertions.assertEquals(JobStatus.FAILED,
jobStateEventFailed.getJobStatus());
Assertions.assertTrue(jobStateEventFailed.getCreatedTime() >
currentTimeMillis);
- Assertions.assertEquals(String.valueOf(jobId_failed),
jobStateEventFailed.getJobName());
+ Assertions.assertEquals(String.valueOf(jobIdFailed),
jobStateEventFailed.getJobName());
+ }
+
+ @Test
+ void testNotEndJobStateEvent() {
+
server.getCoordinatorService().getEngineConfig().setReportNonTerminalJobState(true);
+
+ JobEventProcessor eventProcessor =
+ (JobEventProcessor)
server.getCoordinatorService().getEventProcessor();
+
+ AtomicInteger accessCounter = new AtomicInteger(0);
+ AtomicReference<JobStateEvent> jobStateEventReference = new
AtomicReference<>();
+ EventHandler eventHandler =
+ event -> {
+ if (event.getEventType() != EventType.JOB_STATUS) {
+ return;
+ }
+ JobStateEvent jobStateEvent = (JobStateEvent) event;
+ JobStatus status = jobStateEvent.getJobStatus();
+ switch (status) {
+ case PENDING:
+ case SCHEDULED:
+ case RUNNING:
+ case DOING_SAVEPOINT:
+ case FAILING:
+ case CANCELING:
+ accessCounter.incrementAndGet();
+ jobStateEventReference.lazySet(jobStateEvent);
+ break;
+ default:
+ break;
+ }
+ };
+ // register the event handler
+ List<EventHandler> handlers =
+ (List<EventHandler>) ReflectionUtils.getField(eventProcessor,
"handlers").get();
+ handlers.add(eventHandler);
+ long jobIdFinished = System.currentTimeMillis();
+ long currentTimeMillis = System.currentTimeMillis();
+ startJob(jobIdFinished, "fake_to_console.conf", false);
+ await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+ server.getCoordinatorService()
+ .getJobStatus(jobIdFinished)));
+ // check whether the event handler is executed
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3,
accessCounter.get()));
+ JobStateEvent jobStateEventFinished = jobStateEventReference.get();
+ Assertions.assertEquals(String.valueOf(jobIdFinished),
jobStateEventFinished.getJobId());
+ Assertions.assertEquals(JobStatus.RUNNING,
jobStateEventFinished.getJobStatus());
+ Assertions.assertTrue(jobStateEventFinished.getCreatedTime() >
currentTimeMillis);
+ Assertions.assertEquals(String.valueOf(jobIdFinished),
jobStateEventFinished.getJobName());
+
+ long jobIdFailed = System.currentTimeMillis();
+ startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
+ await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FAILED,
+
server.getCoordinatorService().getJobStatus(jobIdFailed)));
+
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(7,
accessCounter.get()));
+ JobStateEvent jobStateEventFailed = jobStateEventReference.get();
+ Assertions.assertEquals(String.valueOf(jobIdFailed),
jobStateEventFailed.getJobId());
+ Assertions.assertEquals(JobStatus.FAILING,
jobStateEventFailed.getJobStatus());
+ Assertions.assertTrue(jobStateEventFailed.getCreatedTime() >
currentTimeMillis);
+ Assertions.assertEquals(String.valueOf(jobIdFailed),
jobStateEventFailed.getJobName());
+ }
+
+ @Test
+ void testJobStateEventOrderWhenReportNonTerminalJobStateEnabled() {
+
server.getCoordinatorService().getEngineConfig().setReportNonTerminalJobState(true);
+
+ JobEventProcessor eventProcessor =
+ (JobEventProcessor)
server.getCoordinatorService().getEventProcessor();
+
+ List<JobStatus> reportedStatuses = new CopyOnWriteArrayList<>();
+ AtomicReference<JobStateEvent> lastJobStateEventReference = new
AtomicReference<>();
+
+ EventHandler eventHandler =
+ event -> {
+ if (event.getEventType() != EventType.JOB_STATUS) {
+ return;
+ }
+
+ JobStateEvent jobStateEvent = (JobStateEvent) event;
+ reportedStatuses.add(jobStateEvent.getJobStatus());
+ lastJobStateEventReference.lazySet(jobStateEvent);
+ };
+
+ List handlers = (List) ReflectionUtils.getField(eventProcessor,
"handlers").get();
+ handlers.add(eventHandler);
+
+ long jobId = System.currentTimeMillis();
+ long currentTimeMillis = System.currentTimeMillis();
+
+ startJob(jobId, "fake_to_console.conf", false);
+
+ await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+
server.getCoordinatorService().getJobStatus(jobId)));
+
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+
reportedStatuses.contains(JobStatus.FINISHED),
+ "FINISHED event should be reported"));
+
+ Assertions.assertTrue(
+ reportedStatuses.contains(JobStatus.SCHEDULED),
+ "SCHEDULED event should be reported");
+ Assertions.assertTrue(
+ reportedStatuses.contains(JobStatus.RUNNING), "RUNNING event
should be reported");
+ Assertions.assertTrue(
+ reportedStatuses.contains(JobStatus.FINISHED), "FINISHED event
should be reported");
+
+ Assertions.assertTrue(
+ reportedStatuses.indexOf(JobStatus.SCHEDULED)
+ < reportedStatuses.indexOf(JobStatus.RUNNING),
+ "SCHEDULED should be reported before RUNNING. Actual events: "
+ reportedStatuses);
+
+ Assertions.assertTrue(
+ reportedStatuses.indexOf(JobStatus.RUNNING)
+ < reportedStatuses.indexOf(JobStatus.FINISHED),
+ "RUNNING should be reported before FINISHED. Actual events: "
+ reportedStatuses);
+
+ JobStateEvent lastJobStateEvent = lastJobStateEventReference.get();
+ Assertions.assertEquals(String.valueOf(jobId),
lastJobStateEvent.getJobId());
+ Assertions.assertEquals(JobStatus.FINISHED,
lastJobStateEvent.getJobStatus());
+ Assertions.assertTrue(lastJobStateEvent.getCreatedTime() >
currentTimeMillis);
+ Assertions.assertEquals(String.valueOf(jobId),
lastJobStateEvent.getJobName());
}
}