This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch lost
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 375ef6f0df32e2f3d9232d55684726881af29821
Author: benjobs <[email protected]>
AuthorDate: Wed Feb 7 18:22:32 2024 +0800

    [Improve] job state lost improvement
---
 .../apache/streampark/common/util/DateUtils.scala  | 11 ++++++++--
 .../console/core/bean/AlertTemplate.java           |  4 ++--
 .../console/core/entity/Application.java           |  5 ++---
 .../core/service/impl/ApplicationServiceImpl.java  |  4 ++--
 .../console/core/task/FlinkAppHttpWatcher.java     | 24 ++++++++++++++++++----
 .../core/task/FlinkK8sChangeEventListener.java     |  4 ++--
 .../core/service/alert/AlertServiceTest.java       |  4 ++--
 7 files changed, 39 insertions(+), 17 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
index 8c62eebe1..96a282513 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
@@ -17,7 +17,7 @@
 package org.apache.streampark.common.util
 
 import java.text.{ParseException, SimpleDateFormat}
-import java.time.{Duration, LocalDateTime}
+import java.time.{Duration, LocalDateTime, ZoneId}
 import java.time.format.DateTimeFormatter
 import java.util._
 import java.util.concurrent.TimeUnit
@@ -154,7 +154,7 @@ object DateUtils {
    * @param milliseconds
    * @return
    */
-  def toDuration(milliseconds: Long): String = {
+  def toStringDuration(milliseconds: Long): String = {
     val duration = Duration.ofMillis(milliseconds)
     val days = duration.toDays
 
@@ -175,6 +175,13 @@ object DateUtils {
     builder.toString
   }
 
+  def toSecondDuration(time1: Date, time2: Date = new Date()): Long = {
+    val startDateTime = LocalDateTime.ofInstant(time1.toInstant, 
ZoneId.systemDefault());
+    val endDateTime = LocalDateTime.ofInstant(time2.toInstant, 
ZoneId.systemDefault());
+    val duration = Duration.between(startDateTime, endDateTime)
+    duration.toMillis / 1000
+  }
+
   def getTimeUnit(
       time: String,
       default: (Int, TimeUnit) = (5, TimeUnit.SECONDS)): (Int, TimeUnit) = {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index b102c0e11..8dd5b4d7f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -75,7 +75,7 @@ public class AlertTemplate implements Serializable {
           template.setEndTime(
               DateUtils.format(
                   application.getEndTime(), DateUtils.fullFormat(), 
TimeZone.getDefault()));
-          template.setDuration(DateUtils.toDuration(duration));
+          template.setDuration(DateUtils.toStringDuration(duration));
         } else {
           template.setStartTime("-");
           template.setEndTime("-");
@@ -113,7 +113,7 @@ public class AlertTemplate implements Serializable {
     AlertTemplate template = of(application);
     template.setType(2);
     template.setCpFailureRateInterval(
-        DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 
60));
+        DateUtils.toStringDuration(application.getCpFailureRateInterval() * 
1000 * 60));
     template.setCpMaxFailureInterval(application.getCpMaxFailureInterval());
     template.setTitle(String.format("Notify: %s checkpoint FAILED", 
application.getJobName()));
     template.setSubject(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index ab2f69d33..0808afd57 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -326,8 +326,7 @@ public class Application implements Serializable {
    * @return true: can start | false: can not start.
    */
   public boolean isCanBeStart() {
-    FlinkAppState state = FlinkAppState.of(getState());
-    switch (state) {
+    switch (getFlinkAppStateEnum()) {
       case ADDED:
       case CREATED:
       case FAILED:
@@ -345,7 +344,7 @@ public class Application implements Serializable {
   }
 
   public boolean shouldBeTrack() {
-    return shouldTracking(FlinkAppState.of(getState())) == 1;
+    return shouldTracking(getFlinkAppStateEnum()) == 1;
   }
 
   @JsonIgnore
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8360f7439..2dd1681db 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -453,7 +453,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @Override
   public boolean checkAlter(Application application) {
     Long appId = application.getId();
-    FlinkAppState state = FlinkAppState.of(application.getState());
+    FlinkAppState state = application.getFlinkAppStateEnum();
     if (!FlinkAppState.CANCELED.equals(state)) {
       return false;
     }
@@ -700,7 +700,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         return AppExistsState.IN_DB;
       }
 
-      FlinkAppState state = FlinkAppState.of(app.getState());
+      FlinkAppState state = app.getFlinkAppStateEnum();
       // has stopped status
       if (state.equals(FlinkAppState.ADDED)
           || state.equals(FlinkAppState.CREATED)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 309f573fa..63f5c18c0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -18,6 +18,7 @@
 package org.apache.streampark.console.core.task;
 
 import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.DateUtils;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.YarnUtils;
@@ -106,6 +107,9 @@ public class FlinkAppHttpWatcher {
   private static final Cache<Long, Byte> STARTING_CACHE =
       Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
 
+  private static final Cache<Long, Date> LOST_CACHE =
+      Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
+
   /** tracking task list */
   private static final Map<Long, Application> WATCHING_APPS = new 
ConcurrentHashMap<>(0);
 
@@ -205,10 +209,12 @@ public class FlinkAppHttpWatcher {
             try {
               // query status from flink rest api
               getFromFlinkRestApi(application);
+              cleanupLost(application);
             } catch (Exception flinkException) {
               // query status from yarn rest api
               try {
                 getFromYarnRestApi(application);
+                cleanupLost(application);
               } catch (Exception yarnException) {
                 doStateFailed(application);
               }
@@ -217,6 +223,10 @@ public class FlinkAppHttpWatcher {
     }
   }
 
+  private void cleanupLost(Application application) {
+    LOST_CACHE.invalidate(application.getId());
+  }
+
   private void doStateFailed(Application application) {
     /*
      Query from flink's restAPI and yarn's restAPI both failed.
@@ -230,8 +240,14 @@ public class FlinkAppHttpWatcher {
         log.error(
             "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
         if (StopFrom.NONE.equals(stopFrom)) {
-          savePointService.expire(application.getId());
-          application.setState(FlinkAppState.LOST.getValue());
+          Date lostTime = LOST_CACHE.getIfPresent(application.getId());
+          if (lostTime == null) {
+            LOST_CACHE.put(application.getId(), new Date());
+          } else if (DateUtils.toSecondDuration(lostTime, new Date()) >= 30) {
+            savePointService.expire(application.getId());
+            application.setState(FlinkAppState.LOST.getValue());
+            LOST_CACHE.invalidate(application.getId());
+          }
         } else {
           application.setState(FlinkAppState.CANCELED.getValue());
         }
@@ -245,9 +261,9 @@ public class FlinkAppHttpWatcher {
       cleanSavepoint(application);
       cleanOptioning(optionState, application.getId());
       doPersistMetrics(application, true);
-      FlinkAppState appState = FlinkAppState.of(application.getState());
+      FlinkAppState appState = application.getFlinkAppStateEnum();
       if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
-        alertService.alert(application, 
FlinkAppState.of(application.getState()));
+        alertService.alert(application, application.getFlinkAppStateEnum());
         if (appState.equals(FlinkAppState.FAILED)) {
           try {
             applicationService.start(application, true);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 55053f1f0..b0f3f5f14 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -79,7 +79,7 @@ public class FlinkK8sChangeEventListener {
     setByJobStatusCV(app, jobStatus);
     applicationService.persistMetrics(app);
 
-    FlinkAppState state = FlinkAppState.of(app.getState());
+    FlinkAppState state = app.getFlinkAppStateEnum();
     // email alerts when necessary
     if (FlinkAppState.FAILED.equals(state)
         || FlinkAppState.LOST.equals(state)
@@ -143,7 +143,7 @@ public class FlinkK8sChangeEventListener {
     // infer the final flink job state
     Enumeration.Value state =
         FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
-            jobStatus.jobState(), 
toK8sFlinkJobState(FlinkAppState.of(app.getState())));
+            jobStatus.jobState(), 
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
 
     // corrective start-time / end-time / duration
     long preStartTime = app.getStartTime() != null ? 
app.getStartTime().getTime() : 0;
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index 77a2f3df6..4bc4762cd 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -212,7 +212,7 @@ class AlertServiceTest {
     template.setStartTime(
         DateUtils.format(
             application.getStartTime(), DateUtils.fullFormat(), 
TimeZone.getDefault()));
-    template.setDuration(DateUtils.toDuration(duration));
+    template.setDuration(DateUtils.toStringDuration(duration));
     template.setLink(url);
     template.setEndTime(
         DateUtils.format(
@@ -223,7 +223,7 @@ class AlertServiceTest {
     template.setRestartIndex(application.getRestartCount());
     template.setTotalRestart(application.getRestartSize());
     template.setCpFailureRateInterval(
-        DateUtils.toDuration(application.getCpFailureRateInterval() * 1000 * 
60));
+        DateUtils.toStringDuration(application.getCpFailureRateInterval() * 
1000 * 60));
     template.setCpMaxFailureInterval(application.getCpMaxFailureInterval());
 
     return template;

Reply via email to