This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 46bd2a9a865eea5b9e9f05b9394f1dac3f133cd6
Author: benjobs <[email protected]>
AuthorDate: Wed Sep 20 15:00:48 2023 +0800

    [Improve] FlinkAppHttpWatcher improvement
---
 .../streampark/console/core/enums/StopFrom.java    |   6 +-
 .../console/core/task/FlinkAppHttpWatcher.java     | 279 +++++++++++----------
 2 files changed, 147 insertions(+), 138 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
index 7962b8de7..45e99a7b7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
@@ -21,5 +21,9 @@ public enum StopFrom {
   /** None */
   NONE,
   /** StreamPark */
-  STREAMPARK
+  STREAMPARK;
+
+  public boolean isNone() {
+    return this.equals(NONE);
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 4bfb75127..ce034f404 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -78,7 +78,9 @@ import java.util.stream.Collectors;
 public class FlinkAppHttpWatcher {
 
   @Autowired private ApplicationManageService applicationManageService;
+
   @Autowired private ApplicationActionService applicationActionService;
+
   @Autowired private ApplicationInfoService applicationInfoService;
 
   @Autowired private AlertService alertService;
@@ -123,11 +125,11 @@ public class FlinkAppHttpWatcher {
    *
    *
    * <pre>
-   * StopFrom: marked a task stopped from the stream-park web or other ways.
-   * If stop from stream-park web, you can know whether to make a savepoint 
when you stop the task, and if you make a savepoint,
-   * you can set the savepoint as the last effect savepoint, and the next time 
start, will be automatically choose to start.
-   * In other words, if stop from other ways, there is no way to know the 
savepoint has been done, directly set all the savepoint
-   * to expire, and needs to be manually specified when started again.
+   * StopFrom: marked a flink job canceling from the StreamPark or other ways:
+   *    1) If stop from streampark, We can know whether to make a savepoint 
when flink job canceling, and if We make a savepoint,
+   *    We can set the savepoint as the latest savepoint, and the next time 
start, will be automatically choose to start.
+   *    2) if stop from other ways, there is no way to know the savepoint has 
been done, directly set all the savepoint to expire,
+   *    and needs to be manually specified when started again.
    * </pre>
    */
   private static final Map<Long, StopFrom> STOP_FROM_MAP = new 
ConcurrentHashMap<>(0);
@@ -215,74 +217,37 @@ public class FlinkAppHttpWatcher {
   private void watch(Long id, Application application) {
     EXECUTOR.execute(
         () -> {
-          final StopFrom stopFrom =
-              STOP_FROM_MAP.getOrDefault(id, null) == null ? StopFrom.NONE : 
STOP_FROM_MAP.get(id);
-          final OptionState optionState = OPTIONING.get(id);
           try {
             // query status from flink rest api
-            getFromFlinkRestApi(application, stopFrom);
+            getStateFromFlink(application);
           } catch (Exception flinkException) {
             // query status from yarn rest api
             try {
-              getFromYarnRestApi(application, stopFrom);
-            } catch (Exception yarnException) {
-              /*
-               Query from flink's restAPI and yarn's restAPI both failed.
-               In this case, it is necessary to decide whether to return to 
the final state depending on the state being operated
-              */
-              if (optionState == null || OptionState.STARTING != optionState) {
-                // non-mapping
-                if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
-                  log.error(
-                      "FlinkAppHttpWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
-                  if (StopFrom.NONE == stopFrom) {
-                    savePointService.expire(application.getId());
-                    application.setState(FlinkAppState.LOST.getValue());
-                    doAlert(application, FlinkAppState.LOST);
-                  } else {
-                    application.setState(FlinkAppState.CANCELED.getValue());
-                  }
-                }
-                /*
-                 This step means that the above two ways to get information 
have failed, and this step is the last step,
-                 which will directly identify the mission as cancelled or lost.
-                 Need clean savepoint.
-                */
-                application.setEndTime(new Date());
-                cleanSavepoint(application);
-                cleanOptioning(optionState, id);
-                doPersistMetrics(application, true);
-                FlinkAppState appState = application.getStateEnum();
-                if (FlinkAppState.FAILED == appState || FlinkAppState.LOST == 
appState) {
-                  doAlert(application, application.getStateEnum());
-                  if (FlinkAppState.FAILED == appState) {
-                    try {
-                      applicationActionService.start(application, true);
-                    } catch (Exception e) {
-                      log.error(e.getMessage(), e);
-                    }
-                  }
-                }
-              }
+              getStateFromYarn(application);
+            } catch (Exception e) {
+              doStateFailed(application);
             }
           }
         });
   }
 
+  private StopFrom getAppStopFrom(Long appId) {
+    return STOP_FROM_MAP.getOrDefault(appId, StopFrom.NONE);
+  }
+
   /**
    * Get the current task running status information from Flink rest api.
    *
    * @param application The application for which to retrieve the information
-   * @param stopFrom The stop source from which the method was called
    * @throws Exception if an error occurs while retrieving the information 
from the Flink REST API
    */
-  private void getFromFlinkRestApi(Application application, StopFrom stopFrom) 
throws Exception {
+  private void getStateFromFlink(Application application) throws Exception {
     JobsOverview jobsOverview = httpJobsOverview(application);
     Optional<JobsOverview.Job> optional;
     ExecutionMode execMode = application.getExecutionModeEnum();
     if (ExecutionMode.YARN_APPLICATION == execMode || 
ExecutionMode.YARN_PER_JOB == execMode) {
       optional =
-          jobsOverview.getJobs().size() > 1
+          !jobsOverview.getJobs().isEmpty()
               ? jobsOverview.getJobs().stream()
                   .filter(a -> StringUtils.equals(application.getJobId(), 
a.getId()))
                   .findFirst()
@@ -316,7 +281,126 @@ public class FlinkAppHttpWatcher {
         if (FlinkAppState.RUNNING == currentState) {
           handleRunningState(application, optionState, currentState);
         } else {
-          handleNotRunState(application, optionState, currentState, stopFrom);
+          handleNotRunState(application, optionState, currentState);
+        }
+      }
+    }
+  }
+
+  /**
+   * <strong>Query the job history in yarn, indicating that the task has 
stopped, and the final
+   * status of the task is CANCELED</strong>
+   *
+   * @param application application
+   */
+  private void getStateFromYarn(Application application) throws Exception {
+    OptionState optionState = OPTIONING.get(application.getId());
+    /*
+     If the status of the last time is CANCELING (flink rest server is not 
closed at the time of getting information)
+     and the status is not obtained this time (flink rest server is closed),
+     the task is considered CANCELED
+    */
+    Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
+    StopFrom stopFrom = getAppStopFrom(application.getId());
+
+    if (flag != null) {
+      log.info("FlinkAppHttpWatcher previous state: canceling.");
+      if (stopFrom.isNone()) {
+        log.error(
+            "FlinkAppHttpWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
+        savePointService.expire(application.getId());
+      }
+      application.setState(FlinkAppState.CANCELED.getValue());
+      cleanSavepoint(application);
+      cleanOptioning(optionState, application.getId());
+      doPersistMetrics(application, true);
+    } else {
+      // query the status from the yarn rest Api
+      YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+      if (yarnAppInfo == null) {
+        if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
+          throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn 
failed ");
+        }
+      } else {
+        try {
+          String state = yarnAppInfo.getApp().getFinalStatus();
+          FlinkAppState flinkAppState = FlinkAppState.of(state);
+          if (FlinkAppState.OTHER == flinkAppState) {
+            return;
+          }
+          if (FlinkAppState.KILLED == flinkAppState) {
+            if (stopFrom.isNone()) {
+              log.error(
+                  "FlinkAppHttpWatcher getStateFromYarn,job was killed and 
stopFrom NotFound,savePoint expired!");
+              savePointService.expire(application.getId());
+            }
+            flinkAppState = FlinkAppState.CANCELED;
+            cleanSavepoint(application);
+            application.setEndTime(new Date());
+          }
+          if (FlinkAppState.SUCCEEDED == flinkAppState) {
+            flinkAppState = FlinkAppState.FINISHED;
+          }
+          application.setState(flinkAppState.getValue());
+          cleanOptioning(optionState, application.getId());
+          doPersistMetrics(application, true);
+          if (FlinkAppState.FAILED == flinkAppState
+              || FlinkAppState.LOST == flinkAppState
+              || (FlinkAppState.CANCELED == flinkAppState && stopFrom.isNone())
+              || applicationInfoService.checkAlter(application)) {
+            doAlert(application, flinkAppState);
+            stopCanceledJob(application.getId());
+            if (FlinkAppState.FAILED == flinkAppState) {
+              applicationActionService.start(application, true);
+            }
+          }
+        } catch (Exception e) {
+          if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
+            throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn 
error,", e);
+          }
+        }
+      }
+    }
+  }
+
+  private void doStateFailed(Application application) {
+    /*
+     Query from flink's restAPI and yarn's restAPI both failed.
+     In this case, it is necessary to decide whether to return to the final 
state depending on the state being operated
+    */
+    final OptionState optionState = OPTIONING.get(application.getId());
+    if (OptionState.STARTING != optionState) {
+      // non-mapping
+      if (application.getStateEnum() != FlinkAppState.MAPPING) {
+        log.error(
+            "FlinkAppHttpWatcher getStateFromFlink and getStateFromYARN 
error,job failed, savePoint expired!");
+        StopFrom stopFrom = getAppStopFrom(application.getId());
+        if (stopFrom.isNone()) {
+          savePointService.expire(application.getId());
+          application.setState(FlinkAppState.LOST.getValue());
+          doAlert(application, FlinkAppState.LOST);
+        } else {
+          application.setState(FlinkAppState.CANCELED.getValue());
+        }
+      }
+      /*
+       This step means that the above two ways to get information have failed, 
and this step is the last step,
+       which will directly identify the mission as cancelled or lost.
+       Need clean savepoint.
+      */
+      application.setEndTime(new Date());
+      cleanSavepoint(application);
+      cleanOptioning(optionState, application.getId());
+      doPersistMetrics(application, true);
+      FlinkAppState appState = application.getStateEnum();
+      if (FlinkAppState.FAILED == appState || FlinkAppState.LOST == appState) {
+        doAlert(application, application.getStateEnum());
+        if (FlinkAppState.FAILED == appState) {
+          try {
+            applicationActionService.start(application, true);
+          } catch (Exception e) {
+            log.error(e.getMessage(), e);
+          }
         }
       }
     }
@@ -438,14 +522,11 @@ public class FlinkAppHttpWatcher {
    * @param application application
    * @param optionState optionState
    * @param currentState currentState
-   * @param stopFrom stopFrom
    */
   private void handleNotRunState(
-      Application application,
-      OptionState optionState,
-      FlinkAppState currentState,
-      StopFrom stopFrom)
+      Application application, OptionState optionState, FlinkAppState 
currentState)
       throws Exception {
+
     switch (currentState) {
       case CANCELLING:
         CANCELING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
@@ -459,8 +540,9 @@ public class FlinkAppHttpWatcher {
             currentState.name());
         cleanSavepoint(application);
         application.setState(currentState.getValue());
-        if (StopFrom.NONE == stopFrom || 
applicationInfoService.checkAlter(application)) {
-          if (StopFrom.NONE == stopFrom) {
+        StopFrom stopFrom = getAppStopFrom(application.getId());
+        if (stopFrom.isNone() || 
applicationInfoService.checkAlter(application)) {
+          if (stopFrom.isNone()) {
             log.info(
                 "FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not 
form StreamPark,savePoint expired!");
             savePointService.expire(application.getId());
@@ -492,83 +574,6 @@ public class FlinkAppHttpWatcher {
     }
   }
 
-  /**
-   * <strong>Query the job history in yarn, indicating that the task has 
stopped, and the final
-   * status of the task is CANCELED</strong>
-   *
-   * @param application application
-   * @param stopFrom stopFrom
-   */
-  private void getFromYarnRestApi(Application application, StopFrom stopFrom) 
throws Exception {
-    log.debug("FlinkAppHttpWatcher getFromYarnRestApi starting...");
-    OptionState optionState = OPTIONING.get(application.getId());
-
-    /*
-     If the status of the last time is CANCELING (flink rest server is not 
closed at the time of getting information)
-     and the status is not obtained this time (flink rest server is closed),
-     the task is considered CANCELED
-    */
-    Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
-    if (flag != null) {
-      log.info("FlinkAppHttpWatcher previous state: canceling.");
-      if (StopFrom.NONE == stopFrom) {
-        log.error(
-            "FlinkAppHttpWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
-        savePointService.expire(application.getId());
-      }
-      application.setState(FlinkAppState.CANCELED.getValue());
-      cleanSavepoint(application);
-      cleanOptioning(optionState, application.getId());
-      doPersistMetrics(application, true);
-    } else {
-      // query the status from the yarn rest Api
-      YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
-      if (yarnAppInfo == null) {
-        if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
-          throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi 
failed ");
-        }
-      } else {
-        try {
-          String state = yarnAppInfo.getApp().getFinalStatus();
-          FlinkAppState flinkAppState = FlinkAppState.of(state);
-          if (FlinkAppState.OTHER == flinkAppState) {
-            return;
-          }
-          if (FlinkAppState.KILLED == flinkAppState) {
-            if (StopFrom.NONE == stopFrom) {
-              log.error(
-                  "FlinkAppHttpWatcher getFromYarnRestApi,job was killed and 
stopFrom NotFound,savePoint expired!");
-              savePointService.expire(application.getId());
-            }
-            flinkAppState = FlinkAppState.CANCELED;
-            cleanSavepoint(application);
-            application.setEndTime(new Date());
-          }
-          if (FlinkAppState.SUCCEEDED == flinkAppState) {
-            flinkAppState = FlinkAppState.FINISHED;
-          }
-          application.setState(flinkAppState.getValue());
-          cleanOptioning(optionState, application.getId());
-          doPersistMetrics(application, true);
-          if (FlinkAppState.FAILED == flinkAppState
-              || FlinkAppState.LOST == flinkAppState
-              || (FlinkAppState.CANCELED == flinkAppState && StopFrom.NONE == 
stopFrom)
-              || applicationInfoService.checkAlter(application)) {
-            doAlert(application, flinkAppState);
-            stopCanceledJob(application.getId());
-            if (FlinkAppState.FAILED == flinkAppState) {
-              applicationActionService.start(application, true);
-            }
-          }
-        } catch (Exception e) {
-          if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
-            throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi 
error,", e);
-          }
-        }
-      }
-    }
-  }
-
   private void cleanOptioning(OptionState optionState, Long key) {
     if (optionState != null) {
       lastOptionTime = System.currentTimeMillis();

Reply via email to