This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch state in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 46bd2a9a865eea5b9e9f05b9394f1dac3f133cd6 Author: benjobs <[email protected]> AuthorDate: Wed Sep 20 15:00:48 2023 +0800 [Improve] FlinkAppHttpWatcher improvement --- .../streampark/console/core/enums/StopFrom.java | 6 +- .../console/core/task/FlinkAppHttpWatcher.java | 279 +++++++++++---------- 2 files changed, 147 insertions(+), 138 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java index 7962b8de7..45e99a7b7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java @@ -21,5 +21,9 @@ public enum StopFrom { /** None */ NONE, /** StreamPark */ - STREAMPARK + STREAMPARK; + + public boolean isNone() { + return this.equals(NONE); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java index 4bfb75127..ce034f404 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java @@ -78,7 +78,9 @@ import java.util.stream.Collectors; public class FlinkAppHttpWatcher { @Autowired private ApplicationManageService applicationManageService; + @Autowired private ApplicationActionService applicationActionService; + @Autowired private ApplicationInfoService applicationInfoService; @Autowired private AlertService alertService; @@ -123,11 +125,11 @@ public class FlinkAppHttpWatcher { * * * <pre> - * StopFrom: marked a task stopped from the stream-park web or other ways. - * If stop from stream-park web, you can know whether to make a savepoint when you stop the task, and if you make a savepoint, - * you can set the savepoint as the last effect savepoint, and the next time start, will be automatically choose to start. - * In other words, if stop from other ways, there is no way to know the savepoint has been done, directly set all the savepoint - * to expire, and needs to be manually specified when started again. + * StopFrom: marked a flink job canceling from the StreamPark or other ways: + * 1) If stop from streampark, We can know whether to make a savepoint when flink job canceling, and if We make a savepoint, + * We can set the savepoint as the latest savepoint, and the next time start, will be automatically choose to start. + * 2) if stop from other ways, there is no way to know the savepoint has been done, directly set all the savepoint to expire, + * and needs to be manually specified when started again. * </pre> */ private static final Map<Long, StopFrom> STOP_FROM_MAP = new ConcurrentHashMap<>(0); @@ -215,74 +217,37 @@ public class FlinkAppHttpWatcher { private void watch(Long id, Application application) { EXECUTOR.execute( () -> { - final StopFrom stopFrom = - STOP_FROM_MAP.getOrDefault(id, null) == null ? StopFrom.NONE : STOP_FROM_MAP.get(id); - final OptionState optionState = OPTIONING.get(id); try { // query status from flink rest api - getFromFlinkRestApi(application, stopFrom); + getStateFromFlink(application); } catch (Exception flinkException) { // query status from yarn rest api try { - getFromYarnRestApi(application, stopFrom); - } catch (Exception yarnException) { - /* - Query from flink's restAPI and yarn's restAPI both failed. - In this case, it is necessary to decide whether to return to the final state depending on the state being operated - */ - if (optionState == null || OptionState.STARTING != optionState) { - // non-mapping - if (application.getState() != FlinkAppState.MAPPING.getValue()) { - log.error( - "FlinkAppHttpWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); - if (StopFrom.NONE == stopFrom) { - savePointService.expire(application.getId()); - application.setState(FlinkAppState.LOST.getValue()); - doAlert(application, FlinkAppState.LOST); - } else { - application.setState(FlinkAppState.CANCELED.getValue()); - } - } - /* - This step means that the above two ways to get information have failed, and this step is the last step, - which will directly identify the mission as cancelled or lost. - Need clean savepoint. - */ - application.setEndTime(new Date()); - cleanSavepoint(application); - cleanOptioning(optionState, id); - doPersistMetrics(application, true); - FlinkAppState appState = application.getStateEnum(); - if (FlinkAppState.FAILED == appState || FlinkAppState.LOST == appState) { - doAlert(application, application.getStateEnum()); - if (FlinkAppState.FAILED == appState) { - try { - applicationActionService.start(application, true); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - } - } + getStateFromYarn(application); + } catch (Exception e) { + doStateFailed(application); } } }); } + private StopFrom getAppStopFrom(Long appId) { + return STOP_FROM_MAP.getOrDefault(appId, StopFrom.NONE); + } + /** * Get the current task running status information from Flink rest api. * * @param application The application for which to retrieve the information - * @param stopFrom The stop source from which the method was called * @throws Exception if an error occurs while retrieving the information from the Flink REST API */ - private void getFromFlinkRestApi(Application application, StopFrom stopFrom) throws Exception { + private void getStateFromFlink(Application application) throws Exception { JobsOverview jobsOverview = httpJobsOverview(application); Optional<JobsOverview.Job> optional; ExecutionMode execMode = application.getExecutionModeEnum(); if (ExecutionMode.YARN_APPLICATION == execMode || ExecutionMode.YARN_PER_JOB == execMode) { optional = - jobsOverview.getJobs().size() > 1 + !jobsOverview.getJobs().isEmpty() ? jobsOverview.getJobs().stream() .filter(a -> StringUtils.equals(application.getJobId(), a.getId())) .findFirst() @@ -316,7 +281,126 @@ public class FlinkAppHttpWatcher { if (FlinkAppState.RUNNING == currentState) { handleRunningState(application, optionState, currentState); } else { - handleNotRunState(application, optionState, currentState, stopFrom); + handleNotRunState(application, optionState, currentState); + } + } + } + } + + /** + * <strong>Query the job history in yarn, indicating that the task has stopped, and the final + * status of the task is CANCELED</strong> + * + * @param application application + */ + private void getStateFromYarn(Application application) throws Exception { + OptionState optionState = OPTIONING.get(application.getId()); + /* + If the status of the last time is CANCELING (flink rest server is not closed at the time of getting information) + and the status is not obtained this time (flink rest server is closed), + the task is considered CANCELED + */ + Byte flag = CANCELING_CACHE.getIfPresent(application.getId()); + StopFrom stopFrom = getAppStopFrom(application.getId()); + + if (flag != null) { + log.info("FlinkAppHttpWatcher previous state: canceling."); + if (stopFrom.isNone()) { + log.error( + "FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!"); + savePointService.expire(application.getId()); + } + application.setState(FlinkAppState.CANCELED.getValue()); + cleanSavepoint(application); + cleanOptioning(optionState, application.getId()); + doPersistMetrics(application, true); + } else { + // query the status from the yarn rest Api + YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); + if (yarnAppInfo == null) { + if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) { + throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn failed "); + } + } else { + try { + String state = yarnAppInfo.getApp().getFinalStatus(); + FlinkAppState flinkAppState = FlinkAppState.of(state); + if (FlinkAppState.OTHER == flinkAppState) { + return; + } + if (FlinkAppState.KILLED == flinkAppState) { + if (stopFrom.isNone()) { + log.error( + "FlinkAppHttpWatcher getStateFromYarn,job was killed and stopFrom NotFound,savePoint expired!"); + savePointService.expire(application.getId()); + } + flinkAppState = FlinkAppState.CANCELED; + cleanSavepoint(application); + application.setEndTime(new Date()); + } + if (FlinkAppState.SUCCEEDED == flinkAppState) { + flinkAppState = FlinkAppState.FINISHED; + } + application.setState(flinkAppState.getValue()); + cleanOptioning(optionState, application.getId()); + doPersistMetrics(application, true); + if (FlinkAppState.FAILED == flinkAppState + || FlinkAppState.LOST == flinkAppState + || (FlinkAppState.CANCELED == flinkAppState && stopFrom.isNone()) + || applicationInfoService.checkAlter(application)) { + doAlert(application, flinkAppState); + stopCanceledJob(application.getId()); + if (FlinkAppState.FAILED == flinkAppState) { + applicationActionService.start(application, true); + } + } + } catch (Exception e) { + if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) { + throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn error,", e); + } + } + } + } + } + + private void doStateFailed(Application application) { + /* + Query from flink's restAPI and yarn's restAPI both failed. + In this case, it is necessary to decide whether to return to the final state depending on the state being operated + */ + final OptionState optionState = OPTIONING.get(application.getId()); + if (OptionState.STARTING != optionState) { + // non-mapping + if (application.getStateEnum() != FlinkAppState.MAPPING) { + log.error( + "FlinkAppHttpWatcher getStateFromFlink and getStateFromYARN error,job failed, savePoint expired!"); + StopFrom stopFrom = getAppStopFrom(application.getId()); + if (stopFrom.isNone()) { + savePointService.expire(application.getId()); + application.setState(FlinkAppState.LOST.getValue()); + doAlert(application, FlinkAppState.LOST); + } else { + application.setState(FlinkAppState.CANCELED.getValue()); + } + } + /* + This step means that the above two ways to get information have failed, and this step is the last step, + which will directly identify the mission as cancelled or lost. + Need clean savepoint. + */ + application.setEndTime(new Date()); + cleanSavepoint(application); + cleanOptioning(optionState, application.getId()); + doPersistMetrics(application, true); + FlinkAppState appState = application.getStateEnum(); + if (FlinkAppState.FAILED == appState || FlinkAppState.LOST == appState) { + doAlert(application, application.getStateEnum()); + if (FlinkAppState.FAILED == appState) { + try { + applicationActionService.start(application, true); + } catch (Exception e) { + log.error(e.getMessage(), e); + } } } } @@ -438,14 +522,11 @@ public class FlinkAppHttpWatcher { * @param application application * @param optionState optionState * @param currentState currentState - * @param stopFrom stopFrom */ private void handleNotRunState( - Application application, - OptionState optionState, - FlinkAppState currentState, - StopFrom stopFrom) + Application application, OptionState optionState, FlinkAppState currentState) throws Exception { + switch (currentState) { case CANCELLING: CANCELING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE); @@ -459,8 +540,9 @@ public class FlinkAppHttpWatcher { currentState.name()); cleanSavepoint(application); application.setState(currentState.getValue()); - if (StopFrom.NONE == stopFrom || applicationInfoService.checkAlter(application)) { - if (StopFrom.NONE == stopFrom) { + StopFrom stopFrom = getAppStopFrom(application.getId()); + if (stopFrom.isNone() || applicationInfoService.checkAlter(application)) { + if (stopFrom.isNone()) { log.info( "FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not form StreamPark,savePoint expired!"); savePointService.expire(application.getId()); @@ -492,83 +574,6 @@ public class FlinkAppHttpWatcher { } } - /** - * <strong>Query the job history in yarn, indicating that the task has stopped, and the final - * status of the task is CANCELED</strong> - * - * @param application application - * @param stopFrom stopFrom - */ - private void getFromYarnRestApi(Application application, StopFrom stopFrom) throws Exception { - log.debug("FlinkAppHttpWatcher getFromYarnRestApi starting..."); - OptionState optionState = OPTIONING.get(application.getId()); - - /* - If the status of the last time is CANCELING (flink rest server is not closed at the time of getting information) - and the status is not obtained this time (flink rest server is closed), - the task is considered CANCELED - */ - Byte flag = CANCELING_CACHE.getIfPresent(application.getId()); - if (flag != null) { - log.info("FlinkAppHttpWatcher previous state: canceling."); - if (StopFrom.NONE == stopFrom) { - log.error( - "FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!"); - savePointService.expire(application.getId()); - } - application.setState(FlinkAppState.CANCELED.getValue()); - cleanSavepoint(application); - cleanOptioning(optionState, application.getId()); - doPersistMetrics(application, true); - } else { - // query the status from the yarn rest Api - YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); - if (yarnAppInfo == null) { - if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) { - throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi failed "); - } - } else { - try { - String state = yarnAppInfo.getApp().getFinalStatus(); - FlinkAppState flinkAppState = FlinkAppState.of(state); - if (FlinkAppState.OTHER == flinkAppState) { - return; - } - if (FlinkAppState.KILLED == flinkAppState) { - if (StopFrom.NONE == stopFrom) { - log.error( - "FlinkAppHttpWatcher getFromYarnRestApi,job was killed and stopFrom NotFound,savePoint expired!"); - savePointService.expire(application.getId()); - } - flinkAppState = FlinkAppState.CANCELED; - cleanSavepoint(application); - application.setEndTime(new Date()); - } - if (FlinkAppState.SUCCEEDED == flinkAppState) { - flinkAppState = FlinkAppState.FINISHED; - } - application.setState(flinkAppState.getValue()); - cleanOptioning(optionState, application.getId()); - doPersistMetrics(application, true); - if (FlinkAppState.FAILED == flinkAppState - || FlinkAppState.LOST == flinkAppState - || (FlinkAppState.CANCELED == flinkAppState && StopFrom.NONE == stopFrom) - || applicationInfoService.checkAlter(application)) { - doAlert(application, flinkAppState); - stopCanceledJob(application.getId()); - if (FlinkAppState.FAILED == flinkAppState) { - applicationActionService.start(application, true); - } - } - } catch (Exception e) { - if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) { - throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi error,", e); - } - } - } - } - } - private void cleanOptioning(OptionState optionState, Long key) { if (optionState != null) { lastOptionTime = System.currentTimeMillis();
