Copilot commented on code in PR #10075:
URL: https://github.com/apache/seatunnel/pull/10075#discussion_r2633595213
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -504,21 +504,34 @@ protected void handleStopJob(
isStopWithSavePoint =
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
}
+ boolean forceStop = false;
+ if (map.get(RestConstant.FORCE) != null) {
+ forceStop =
Boolean.parseBoolean(map.get(RestConstant.FORCE).toString());
+ }
if (!seaTunnelServer.isMasterNode()) {
+ if (forceStop) {
+ NodeEngineUtil.sendOperationToMasterNode(
+ node.nodeEngine, new CancelJobOperation(jobId,
true))
+ .join();
+ return;
+ }
if (isStopWithSavePoint) {
NodeEngineUtil.sendOperationToMasterNode(
node.nodeEngine, new
SavePointJobOperation(jobId))
.join();
} else {
NodeEngineUtil.sendOperationToMasterNode(
- node.nodeEngine, new CancelJobOperation(jobId))
+ node.nodeEngine, new CancelJobOperation(jobId,
false))
.join();
}
} else {
CoordinatorService coordinatorService =
seaTunnelServer.getCoordinatorService();
-
+ if (forceStop) {
+ coordinatorService.stopJob(jobId);
+ return;
+ }
Review Comment:
When the force parameter is true, the method returns early without calling
the standard cancelJob flow. This means that any future enhancements or side
effects in the cancelJob method won't be applied to force-stopped jobs.
Consider whether stopJob should be a completely separate operation or if
there's shared logic that both paths should execute.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -220,10 +220,45 @@ public void savepointJob() {
updateJobState(JobStatus.DOING_SAVEPOINT);
}
+ public void stopJob() {
+ JobStatus jobStatus = getJobStatus();
+ if (jobStatus.isEndState()) {
+ log.warn("{} is in end state {}, can not be stop", jobFullName,
jobStatus);
+ return;
+ }
+
+ if (jobStatus.ordinal() <= JobStatus.PENDING.ordinal()) {
+ // Tasks with the status 'INITIALIZING', 'CREATED', 'PENDING' need
to be set directly to
+ // the 'CANCELLED' state because it has not yet started running
+ updateJobState(JobStatus.CANCELED);
+ } else if (jobStatus == JobStatus.DOING_SAVEPOINT) {
+
this.pipelineList.forEach(SubPlan::stopPipelineWithCheckpointFallback);
+ } else {
+ updateJobState(JobStatus.CANCELING);
+ this.pipelineList.forEach(SubPlan::forceStopPipeline);
Review Comment:
The stopJob method in PhysicalPlan calls forEach on pipelineList without
proper synchronization, while the list might be modified by other threads.
Although the method itself is synchronized via updateJobState, the forEach
operation on line 238 could be problematic if the pipelineList is modified
elsewhere concurrently. Consider using a thread-safe iteration approach or
ensuring proper synchronization.
##########
seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java:
##########
@@ -69,11 +69,17 @@ public class ClientCommandArgs extends AbstractCommandArgs {
private String jobId;
@Parameter(
- names = {"-can", "--cancel-job"},
+ names = {"-can", "--cancel", "--cancel-job"},
variableArity = true,
description = "Cancel job by JobId")
private List<String> cancelJobId;
+ @Parameter(
+ names = {"-f", "--force-cancel", "--force-cancel-job"},
+ variableArity = true,
+ description = "Force cancel job by JobId")
Review Comment:
The parameter description format is inconsistent. Line 72 says "Cancel job
by JobId" while line 80 says "Force cancel job by JobId". For consistency with
the updated line 24 in the documentation, both should say "job(s)" to indicate
multiple jobs can be canceled. Also, "cancel" should be capitalized to "Cancel"
to match the pattern on line 72.
```suggestion
description = "Cancel job(s) by JobId")
private List<String> cancelJobId;
@Parameter(
names = {"-f", "--force-cancel", "--force-cancel-job"},
variableArity = true,
description = "Force Cancel job(s) by JobId")
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -517,6 +517,16 @@ public void
updateStateByExecutionService(TaskExecutionState taskExecutionState)
updateTaskState(taskExecutionState.getExecutionState());
}
+ public void forceStop() {
+ if (getExecutionState().isEndState()) {
Review Comment:
The forceStop method should check if getExecutionState() returns null before
calling isEndState(). If the execution state is null, calling isEndState()
could result in a NullPointerException. Add a null check before the
isEndState() call.
```suggestion
ExecutionState executionState = getExecutionState();
if (executionState == null || executionState.isEndState()) {
```
##########
docs/en/seatunnel-engine/user-command.md:
##########
@@ -18,24 +18,25 @@ The output is as follows:
Usage: seatunnel.sh [options]
Options:
- --async Run the job asynchronously. When the job
is submitted, the client will exit (default: false).
- -can, --cancel-job Cancel the job by JobId.
- --check Whether to check the config (default:
false).
- -cj, --close-job Close the client and the task will also be
closed (default: true).
- -cn, --cluster The name of the cluster.
- -c, --config Config file.
- --decrypt Decrypt the config file. When both
--decrypt and --encrypt are specified, only --encrypt will take effect
(default: false).
- -m, --master, -e, --deploy-mode SeaTunnel job submit master, support
[local, cluster] (default: cluster).
- --encrypt Encrypt the config file. When both
--decrypt and --encrypt are specified, only --encrypt will take effect
(default: false).
- --get_running_job_metrics Get metrics for running jobs (default:
false).
- -h, --help Show the usage message.
- -j, --job-id Get the job status by JobId.
- -l, --list List the job status (default: false).
- --metrics Get the job metrics by JobId.
- -n, --name The SeaTunnel job name (default:
SeaTunnel).
- -r, --restore Restore with savepoint by jobId.
- -s, --savepoint Savepoint the job by jobId.
- -i, --variable Variable substitution, such as -i
city=beijing, or -i date=20190318. We use ',' as a separator. When inside "",
',' are treated as normal characters instead of delimiters. (default: []).
+ --async Run the job asynchronously.
When the job is submitted, the client will exit (default: false).
+ -can, --cancel, --cancel-job Cancel the job(s) by JobId.
+ -f, --force-cancel, --force-cancel-job Force cancel job(s) by JobId.
Review Comment:
The description in the documentation should use the plural form "job(s)"
consistently with the "Cancel job(s) by JobId" description on line 24.
Currently, line 25 says "Force cancel job(s) by JobId" which is inconsistent in
capitalization - "cancel" should be capitalized to "Cancel" to match the
pattern.
```suggestion
-f, --force-cancel, --force-cancel-job Force Cancel job(s) by JobId.
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -503,6 +509,22 @@ public void restorePipeline() {
}
}
+ public void stopPipelineWithCheckpointFallback() {
+ if (jobMaster.getCheckpointManager() == null) {
+ forceStopPipeline();
+ return;
+ }
+ if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
+ forcePipelineFinish();
+ } else {
+ log.warn(
+ "Failed to stop the pipeline gracefully. Falling back to
forced stop: {}",
+ pipelineFullName);
+ cancelCheckpointCoordinator();
+ forceStopPipeline();
+ }
+ }
Review Comment:
The stopPipelineWithCheckpointFallback method also lacks synchronization
despite modifying pipeline state. Since this method calls forceStopPipeline and
forcePipelineFinish which modify vertex states, it should be synchronized to
prevent concurrent modifications that could lead to inconsistent pipeline state.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -396,6 +396,12 @@ public synchronized void cancelPipeline() {
}
}
+ public void forceStopPipeline() {
+ jobMaster.neverNeedRestore();
+ coordinatorVertexList.forEach(PhysicalVertex::forceStop);
+ physicalVertexList.forEach(PhysicalVertex::forceStop);
+ }
Review Comment:
The forceStopPipeline method lacks synchronization, which could lead to race
conditions when called concurrently with other pipeline state management
methods. The cancelPipeline method on line 392 is synchronized, but
forceStopPipeline is not. This inconsistency could cause thread safety issues
when multiple threads attempt to stop or manage the pipeline state
simultaneously.
##########
seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java:
##########
@@ -69,11 +69,17 @@ public class ClientCommandArgs extends AbstractCommandArgs {
private String jobId;
@Parameter(
- names = {"-can", "--cancel-job"},
+ names = {"-can", "--cancel", "--cancel-job"},
variableArity = true,
description = "Cancel job by JobId")
Review Comment:
Similarly, the description on line 74 should also use "job(s)" to indicate
multiple jobs can be canceled, matching the pattern established in the
documentation updates. Change "Cancel job by JobId" to "Cancel job(s) by JobId".
```suggestion
description = "Cancel job(s) by JobId")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]