Copilot commented on code in PR #10075:
URL: https://github.com/apache/seatunnel/pull/10075#discussion_r2601431377


##########
docs/en/seatunnel-engine/rest-api-v2.md:
##########
@@ -853,12 +853,22 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 <details>
 <summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns jobId 
if job stoped successfully.)</code></summary>

Review Comment:
   Typo: "stoped" should be "stopped".
   ```suggestion
   <summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns 
jobId if job stopped successfully.)</code></summary>
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -220,10 +220,45 @@ public void savepointJob() {
         updateJobState(JobStatus.DOING_SAVEPOINT);
     }
 
+    public void stopJob() {
+        JobStatus jobStatus = getJobStatus();
+        if (jobStatus.isEndState()) {
+            log.warn("{} is in end state {}, can not be stop", jobFullName, 
jobStatus);
+            return;
+        }
+
+        if (jobStatus.ordinal() <= JobStatus.PENDING.ordinal()) {
+            // Tasks with the status 'INITIALIZING', 'CREATED', 'PENDING' need 
to be set directly to
+            // the 'CANCELLED' state because it has not yet started running
+            updateJobState(JobStatus.CANCELED);
+        } else if (jobStatus == JobStatus.DOING_SAVEPOINT) {
+            
this.pipelineList.forEach(SubPlan::finishPipelineWithCheckpointFallback);
+        } else {
+            updateJobState(JobStatus.CANCELING);

Review Comment:
   When force stopping a job that is not in DOING_SAVEPOINT state (line 
237-238), the checkpoint coordinator should be cancelled before calling 
forceStopPipeline. In the DOING_SAVEPOINT branch (line 235), 
finishPipelineWithCheckpointFallback properly cancels the checkpoint 
coordinator via cancelCheckpointCoordinator(), but this step is missing in the 
else branch. Consider extracting the checkpoint cancellation logic similar to 
the cancelJob() method which calls cancelPipeline() that handles this 
internally through cancelCheckpointCoordinator().
   ```suggestion
               updateJobState(JobStatus.CANCELING);
               this.pipelineList.forEach(SubPlan::cancelCheckpointCoordinator);
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -161,6 +161,82 @@ public void 
testInvocationFutureUseCompletableFutureExecutor() {
         instance.shutdown();
     }
 
+    @Test
+    void testForceStopRunningJob() {
+        JobInformation jobInformation =
+                submitJob(
+                        "CoordinatorServiceTest_testStopRunningJob",
+                        "stream_fake_to_console.conf",
+                        "test_stop_running_job");
+        CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
+
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    JobStatus.RUNNING,
+                                    
coordinatorService.getJobStatus(jobInformation.jobId));
+                            JobMaster jobMaster =
+                                    
coordinatorService.getJobMaster(jobInformation.jobId);
+                            Assertions.assertNotNull(jobMaster);
+                            Assertions.assertTrue(
+                                    jobMaster
+                                            .getRunningJobStateIMap()
+                                            
.containsKey(jobInformation.jobId));
+                        });
+
+        coordinatorService.stopJob(jobInformation.jobId).join();
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    JobStatus.CANCELED,
+                                    
coordinatorService.getJobStatus(jobInformation.jobId));
+                        });
+        jobInformation.coordinatorService.clearCoordinatorService();
+        jobInformation.coordinatorServiceTest.shutdown();
+    }
+
+    @Test
+    void testForceStopAbnormalSavepointJob() {
+        JobInformation jobInformation =
+                submitJob(
+                        "CoordinatorServiceTest_testStopRunningJob",

Review Comment:
   The test method name indicates testing "testStopRunningJob" in line 204, but 
the actual test method is named "testForceStopAbnormalSavepointJob". The string 
should be updated to match the test method name for consistency and clarity.
   ```suggestion
                           
"CoordinatorServiceTest_testForceStopAbnormalSavepointJob",
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -503,6 +509,21 @@ public void restorePipeline() {
         }
     }
 
+    public void finishPipelineWithCheckpointFallback() {
+        if (jobMaster.getCheckpointManager() == null) {

Review Comment:
   When checkpoint manager is null (line 513), the method returns early without 
actually stopping the pipeline. This means the force stop operation will be 
incomplete. Consider calling `forceStopPipeline()` before the early return to 
ensure the pipeline is properly stopped even when checkpoints are disabled.
   ```suggestion
           if (jobMaster.getCheckpointManager() == null) {
               forceStopPipeline();
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -161,6 +161,82 @@ public void 
testInvocationFutureUseCompletableFutureExecutor() {
         instance.shutdown();
     }
 
+    @Test
+    void testForceStopRunningJob() {
+        JobInformation jobInformation =
+                submitJob(
+                        "CoordinatorServiceTest_testStopRunningJob",

Review Comment:
   The test method name indicates testing "testStopRunningJob" in line 168, but 
the actual test method is named "testForceStopRunningJob". The string should be 
updated to match the test method name for consistency and clarity.
   ```suggestion
                           "CoordinatorServiceTest_testForceStopRunningJob",
   ```



##########
docs/en/seatunnel-engine/rest-api-v1.md:
##########
@@ -596,12 +596,22 @@ When we can't get the job info, the response will be:
 <details>
 <summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> 
<code>(Returns jobId if job stoped successfully.)</code></summary>

Review Comment:
   Typo: "stoped" should be "stopped".
   ```suggestion
   <summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> 
<code>(Returns jobId if job stopped successfully.)</code></summary>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to