This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new 9d58f3d64 [Improve] k8s job state improve
9d58f3d64 is described below

commit 9d58f3d642f8e8ea5ed8f1c5d4c34889a8cd6217
Author: benjobs <[email protected]>
AuthorDate: Sat Apr 13 16:01:43 2024 +0800

    [Improve] k8s job state improve
---
 .../core/task/FlinkK8sChangeEventListener.java     | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 20bea91da..e211e57f5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -74,12 +74,19 @@ public class FlinkK8sChangeEventListener {
     if (app == null) {
       return;
     }
-    if (FlinkAppState.isEndState(app.getState())) {
-      log.info("job [{}] is EndState, but also update it.", app.getJobName());
+
+    Enumeration.Value inferState =
+        FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
+            jobStatus.jobState(), 
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
+
+    FlinkAppState appState = fromK8sFlinkJobState(inferState);
+    if (app.getFlinkAppStateEnum() == appState) {
+      return;
     }
 
     // update application record
-    setByJobStatusCV(app, jobStatus);
+    setByJobStatusCV(app, jobStatus, inferState);
+
     applicationService.persistMetrics(app);
 
     FlinkAppState state = app.getFlinkAppStateEnum();
@@ -142,12 +149,8 @@ public class FlinkK8sChangeEventListener {
     
checkpointProcessor.process(applicationService.getById(event.trackId().appId()),
 checkPoint);
   }
 
-  private void setByJobStatusCV(Application app, JobStatusCV jobStatus) {
-    // infer the final flink job state
-    Enumeration.Value state =
-        FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
-            jobStatus.jobState(), 
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
-
+  private void setByJobStatusCV(
+      Application app, JobStatusCV jobStatus, Enumeration.Value inferState) {
     // corrective start-time / end-time / duration
     long preStartTime = app.getStartTime() != null ? 
app.getStartTime().getTime() : 0;
     long startTime = Math.max(jobStatus.jobStartTime(), preStartTime);
@@ -155,7 +158,7 @@ public class FlinkK8sChangeEventListener {
     long endTime = Math.max(jobStatus.jobEndTime(), preEndTime);
     long duration = jobStatus.duration();
 
-    if (FlinkJobState.isEndState(state)) {
+    if (FlinkJobState.isEndState(inferState)) {
       if (endTime < startTime) {
         endTime = System.currentTimeMillis();
       }
@@ -164,7 +167,7 @@ public class FlinkK8sChangeEventListener {
       }
     }
 
-    app.setState(fromK8sFlinkJobState(state).getValue());
+    app.setState(fromK8sFlinkJobState(inferState).getValue());
     app.setJobId(jobStatus.jobId());
     app.setTotalTask(jobStatus.taskTotal());
 

Reply via email to