Copilot commented on code in PR #10075:
URL: https://github.com/apache/seatunnel/pull/10075#discussion_r2633595213


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java:
##########
@@ -504,21 +504,34 @@ protected void handleStopJob(
             isStopWithSavePoint =
                     
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
         }
+        boolean forceStop = false;
+        if (map.get(RestConstant.FORCE) != null) {
+            forceStop = 
Boolean.parseBoolean(map.get(RestConstant.FORCE).toString());
+        }
 
         if (!seaTunnelServer.isMasterNode()) {
+            if (forceStop) {
+                NodeEngineUtil.sendOperationToMasterNode(
+                                node.nodeEngine, new CancelJobOperation(jobId, 
true))
+                        .join();
+                return;
+            }
             if (isStopWithSavePoint) {
                 NodeEngineUtil.sendOperationToMasterNode(
                                 node.nodeEngine, new 
SavePointJobOperation(jobId))
                         .join();
             } else {
                 NodeEngineUtil.sendOperationToMasterNode(
-                                node.nodeEngine, new CancelJobOperation(jobId))
+                                node.nodeEngine, new CancelJobOperation(jobId, 
false))
                         .join();
             }
 
         } else {
             CoordinatorService coordinatorService = 
seaTunnelServer.getCoordinatorService();
-
+            if (forceStop) {
+                coordinatorService.stopJob(jobId);
+                return;
+            }

Review Comment:
   When the force parameter is true, the method returns early without calling 
the standard cancelJob flow. This means that any future enhancements or side 
effects in the cancelJob method won't be applied to force-stopped jobs. 
Consider whether stopJob should be a completely separate operation or if 
there's shared logic that both paths should execute.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -220,10 +220,45 @@ public void savepointJob() {
         updateJobState(JobStatus.DOING_SAVEPOINT);
     }
 
+    public void stopJob() {
+        JobStatus jobStatus = getJobStatus();
+        if (jobStatus.isEndState()) {
+            log.warn("{} is in end state {}, can not be stop", jobFullName, 
jobStatus);
+            return;
+        }
+
+        if (jobStatus.ordinal() <= JobStatus.PENDING.ordinal()) {
+            // Tasks with the status 'INITIALIZING', 'CREATED', 'PENDING' need 
to be set directly to
+            // the 'CANCELLED' state because it has not yet started running
+            updateJobState(JobStatus.CANCELED);
+        } else if (jobStatus == JobStatus.DOING_SAVEPOINT) {
+            
this.pipelineList.forEach(SubPlan::stopPipelineWithCheckpointFallback);
+        } else {
+            updateJobState(JobStatus.CANCELING);
+            this.pipelineList.forEach(SubPlan::forceStopPipeline);

Review Comment:
   The stopJob method in PhysicalPlan calls forEach on pipelineList without 
proper synchronization, while the list might be modified by other threads. 
Although the method itself is synchronized via updateJobState, the forEach 
operation on line 238 could be problematic if the pipelineList is modified 
elsewhere concurrently. Consider using a thread-safe iteration approach or 
ensuring proper synchronization.



##########
seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java:
##########
@@ -69,11 +69,17 @@ public class ClientCommandArgs extends AbstractCommandArgs {
     private String jobId;
 
     @Parameter(
-            names = {"-can", "--cancel-job"},
+            names = {"-can", "--cancel", "--cancel-job"},
             variableArity = true,
             description = "Cancel job by JobId")
     private List<String> cancelJobId;
 
+    @Parameter(
+            names = {"-f", "--force-cancel", "--force-cancel-job"},
+            variableArity = true,
+            description = "Force cancel job by JobId")

Review Comment:
   The parameter description format is inconsistent. Line 72 says "Cancel job 
by JobId" while line 80 says "Force cancel job by JobId". For consistency with 
the updated line 24 in the documentation, both should say "job(s)" to indicate 
multiple jobs can be canceled. Also, "cancel" should be capitalized to "Cancel" 
to match the pattern on line 72.
   ```suggestion
               description = "Cancel job(s) by JobId")
       private List<String> cancelJobId;
   
       @Parameter(
               names = {"-f", "--force-cancel", "--force-cancel-job"},
               variableArity = true,
               description = "Force Cancel job(s) by JobId")
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -517,6 +517,16 @@ public void 
updateStateByExecutionService(TaskExecutionState taskExecutionState)
         updateTaskState(taskExecutionState.getExecutionState());
     }
 
+    public void forceStop() {
+        if (getExecutionState().isEndState()) {

Review Comment:
   The forceStop method should check if getExecutionState() returns null before 
calling isEndState(). If the execution state is null, calling isEndState() 
could result in a NullPointerException. Add a null check before the 
isEndState() call.
   ```suggestion
           ExecutionState executionState = getExecutionState();
           if (executionState == null || executionState.isEndState()) {
   ```



##########
docs/en/seatunnel-engine/user-command.md:
##########
@@ -18,24 +18,25 @@ The output is as follows:
 
 Usage: seatunnel.sh [options]
   Options:
-    --async                         Run the job asynchronously. When the job 
is submitted, the client will exit (default: false).
-    -can, --cancel-job              Cancel the job by JobId.
-    --check                         Whether to check the config (default: 
false).
-    -cj, --close-job                Close the client and the task will also be 
closed (default: true).
-    -cn, --cluster                  The name of the cluster.
-    -c, --config                    Config file.
-    --decrypt                       Decrypt the config file. When both 
--decrypt and --encrypt are specified, only --encrypt will take effect 
(default: false). 
-    -m, --master, -e, --deploy-mode SeaTunnel job submit master, support 
[local, cluster] (default: cluster).
-    --encrypt                       Encrypt the config file. When both 
--decrypt and --encrypt are specified, only --encrypt will take effect 
(default: false). 
-    --get_running_job_metrics       Get metrics for running jobs (default: 
false).
-    -h, --help                      Show the usage message.
-    -j, --job-id                    Get the job status by JobId.
-    -l, --list                      List the job status (default: false).
-    --metrics                       Get the job metrics by JobId.
-    -n, --name                      The SeaTunnel job name (default: 
SeaTunnel).
-    -r, --restore                   Restore with savepoint by jobId.
-    -s, --savepoint                 Savepoint the job by jobId.
-    -i, --variable                  Variable substitution, such as -i 
city=beijing, or -i date=20190318. We use ',' as a separator. When inside "", 
',' are treated as normal characters instead of delimiters. (default: []).
+    --async                                     Run the job asynchronously. 
When the job is submitted, the client will exit (default: false).
+    -can, --cancel, --cancel-job                Cancel the job(s) by JobId.
+    -f, --force-cancel, --force-cancel-job      Force cancel job(s) by JobId.

Review Comment:
   The description in the documentation should use the plural form "job(s)" 
consistently with the "Cancel job(s) by JobId" description on line 24. 
Currently, line 25 says "Force cancel job(s) by JobId" which is inconsistent in 
capitalization - "cancel" should be capitalized to "Cancel" to match the 
pattern.
   ```suggestion
       -f, --force-cancel, --force-cancel-job      Force Cancel job(s) by JobId.
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -503,6 +509,22 @@ public void restorePipeline() {
         }
     }
 
+    public void stopPipelineWithCheckpointFallback() {
+        if (jobMaster.getCheckpointManager() == null) {
+            forceStopPipeline();
+            return;
+        }
+        if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
+            forcePipelineFinish();
+        } else {
+            log.warn(
+                    "Failed to stop the pipeline gracefully. Falling back to 
forced stop: {}",
+                    pipelineFullName);
+            cancelCheckpointCoordinator();
+            forceStopPipeline();
+        }
+    }

Review Comment:
   The stopPipelineWithCheckpointFallback method also lacks synchronization 
despite modifying pipeline state. Since this method calls forceStopPipeline and 
forcePipelineFinish which modify vertex states, it should be synchronized to 
prevent concurrent modifications that could lead to inconsistent pipeline state.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -396,6 +396,12 @@ public synchronized void cancelPipeline() {
         }
     }
 
+    public void forceStopPipeline() {
+        jobMaster.neverNeedRestore();
+        coordinatorVertexList.forEach(PhysicalVertex::forceStop);
+        physicalVertexList.forEach(PhysicalVertex::forceStop);
+    }

Review Comment:
   The forceStopPipeline method lacks synchronization, which could lead to race 
conditions when called concurrently with other pipeline state management 
methods. The cancelPipeline method on line 392 is synchronized, but 
forceStopPipeline is not. This inconsistency could cause thread safety issues 
when multiple threads attempt to stop or manage the pipeline state 
simultaneously.



##########
seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java:
##########
@@ -69,11 +69,17 @@ public class ClientCommandArgs extends AbstractCommandArgs {
     private String jobId;
 
     @Parameter(
-            names = {"-can", "--cancel-job"},
+            names = {"-can", "--cancel", "--cancel-job"},
             variableArity = true,
             description = "Cancel job by JobId")

Review Comment:
   Similarly, the description on line 74 should also use "job(s)" to indicate 
multiple jobs can be canceled, matching the pattern established in the 
documentation updates. Change "Cancel job by JobId" to "Cancel job(s) by JobId".
   ```suggestion
               description = "Cancel job(s) by JobId")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to