Copilot commented on code in PR #10075: URL: https://github.com/apache/seatunnel/pull/10075#discussion_r2587193383
##########
docs/en/seatunnel-engine/rest-api-v1.md:
##########
@@ -573,7 +573,8 @@ When we can't get the job info, the response will be:
```json
{
"jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "isStopWithSavePoint": false # If the job is stopped with a savepoint.
+ "force": false # If true, the job is force-stopped (ignores
isStopWithSavePoint).
Review Comment:
The JSON example is missing a comma between `isStopWithSavePoint` and
`force` properties. The correct format should be:
```json
{
"jobId": 733584788375666689,
"isStopWithSavePoint": false, # If the job is stopped with a savepoint.
"force": false # If true, the job is force-stopped (ignores
isStopWithSavePoint).
}
```
Note the comma after the comment on the `isStopWithSavePoint` line.
```suggestion
"isStopWithSavePoint": false,
"force": false
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -503,6 +509,17 @@ public void restorePipeline() {
}
}
+ public void finishPipelineIfCheckpointCompleted() {
+ if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
+ forcePipelineFinish();
+ } else {
+ throw new SeaTunnelEngineException(
+ "Cannot finish pipeline "
+ + pipelineFullName
+ + " because the checkpoint is not complete.");
+ }
+ }
Review Comment:
The method `finishPipelineIfCheckpointCompleted` calls
`jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)` without
checking if `getCheckpointManager()` returns null. This could cause a
NullPointerException. Other methods in this class (e.g.,
`notifyCheckpointManagerPipelineEnd` at line 298) check for null before calling
methods on the checkpoint manager. Consider adding a null check here as well.
##########
docs/zh/seatunnel-engine/rest-api-v1.md:
##########
@@ -574,7 +574,8 @@ network:
```json
{
"jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "isStopWithSavePoint": false # If the job is stopped with a savepoint.
+ "force": false # If true, the job is force-stopped (ignores
isStopWithSavePoint).
Review Comment:
The JSON example is missing a comma between `isStopWithSavePoint` and
`force` properties. The correct format should be:
```json
{
"jobId": 733584788375666689,
"isStopWithSavePoint": false, # If the job is stopped with a savepoint.
"force": false # If true, the job is force-stopped (ignores
isStopWithSavePoint).
}
```
Note the comma after the comment on the `isStopWithSavePoint` line.
```suggestion
"isStopWithSavePoint": false,
"force": false
```
##########
docs/zh/seatunnel-engine/rest-api-v2.md:
##########
@@ -660,7 +660,8 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
```json
{
"jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "isStopWithSavePoint": false # If the job is stopped with a savepoint.
+ "force": false # If true, the job is force-stopped (ignores
isStopWithSavePoint).
Review Comment:
The JSON example is missing a comma between `isStopWithSavePoint` and
`force` properties. The correct format should be:
```json
{
"jobId": 733584788375666689,
"isStopWithSavePoint": false, # If the job is stopped with a savepoint.
"force": false # If true, the job is force-stopped (ignores
isStopWithSavePoint).
}
```
Note the comma after the comment on the `isStopWithSavePoint` line.
```suggestion
"isStopWithSavePoint": false,
"force": false
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java:
##########
@@ -1163,6 +1163,154 @@ public void testHoconStopJobV2() {
});
}
+ @Test
+ public void testForceStopJob() {
+ AtomicInteger i = new AtomicInteger();
+
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(0);
+ String jobId =
+ submitJob(
+ container,
+ task._1(),
+ task._2(),
+ "STREAMING",
+ jobName,
+ paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.REST_URL_RUNNING_JOB
+ +
"/"
+ +
jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters =
+ "{" + "\"jobId\":" + jobId + "," +
"\"force\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.REST_URL_STOP_JOB)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.REST_URL_FINISHED_JOBS
+ +
"/CANCELED")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" +
i.get() + "].jobId",
+
equalTo(jobId)));
Review Comment:
The `AtomicInteger i` is initialized but never incremented within the
forEach loop. This means that for both server instances (server and
secondServer), the test will check `[0].jobId` in the assertion. The second
server's job verification will likely fail or be checking the wrong job.
Consider incrementing `i` after each container iteration, similar to how it's
done in `testHoconStopJobV2` at line 1162.
```suggestion
equalTo(jobId)));
i.incrementAndGet();
```
##########
docs/en/seatunnel-engine/rest-api-v2.md:
##########
@@ -676,7 +676,8 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
```json
{
"jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "isStopWithSavePoint": false # If the job is stopped with a savepoint.
Review Comment:
The JSON example is missing a comma between `isStopWithSavePoint` and
`force` properties. The correct format should be:
```json
{
"jobId": 733584788375666689,
"isStopWithSavePoint": false, # If the job is stopped with a savepoint.
"force": false # If true, the job is force-stopped (ignores
isStopWithSavePoint).
}
```
Note the comma after the comment on the `isStopWithSavePoint` line.
```suggestion
"isStopWithSavePoint": false, # If the job is stopped with a savepoint.
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java:
##########
@@ -1163,6 +1163,154 @@ public void testHoconStopJobV2() {
});
}
+ @Test
+ public void testForceStopJob() {
+ AtomicInteger i = new AtomicInteger();
+
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(0);
+ String jobId =
+ submitJob(
+ container,
+ task._1(),
+ task._2(),
+ "STREAMING",
+ jobName,
+ paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.REST_URL_RUNNING_JOB
+ +
"/"
+ +
jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters =
+ "{" + "\"jobId\":" + jobId + "," +
"\"force\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.REST_URL_STOP_JOB)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.REST_URL_FINISHED_JOBS
+ +
"/CANCELED")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" +
i.get() + "].jobId",
+
equalTo(jobId)));
+ });
+ }
+
+ @Test
+ public void testForceStopJobV2() {
+ AtomicInteger i = new AtomicInteger();
+
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(1);
+ String jobId =
+ submitJob(
+ container,
+ task._1(),
+ task._2(),
+ "STREAMING",
+ jobName,
+ paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.REST_URL_RUNNING_JOB
+ +
"/"
+ +
jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters =
+ "{" + "\"jobId\":" + jobId + "," +
"\"force\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.REST_URL_STOP_JOB)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.REST_URL_FINISHED_JOBS
+ +
"/CANCELED")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" +
i.get() + "].jobId",
+
equalTo(jobId)));
Review Comment:
The `AtomicInteger i` is initialized but never incremented within the
forEach loop. This means that for both server instances (server and
secondServer), the test will check `[0].jobId` in the assertion. The second
server's job verification will likely fail or be checking the wrong job.
Consider incrementing `i` after each container iteration, similar to how it's
done in `testHoconStopJobV2` at line 1162.
```suggestion
equalTo(jobId)));
i.incrementAndGet();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
