This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d29447884 [Improve] FlinkAppHttpWatcher improvement (#3150)
d29447884 is described below
commit d29447884ff59b09b526f267845617e37269033c
Author: benjobs <[email protected]>
AuthorDate: Wed Sep 20 02:27:52 2023 -0500
[Improve] FlinkAppHttpWatcher improvement (#3150)
---
.../streampark/console/core/enums/StopFrom.java | 6 +-
.../console/core/task/FlinkAppHttpWatcher.java | 279 +++++++++++----------
2 files changed, 147 insertions(+), 138 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
index 7962b8de7..45e99a7b7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/StopFrom.java
@@ -21,5 +21,9 @@ public enum StopFrom {
/** None */
NONE,
/** StreamPark */
- STREAMPARK
+ STREAMPARK;
+
+ public boolean isNone() {
+ return this.equals(NONE);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 4bfb75127..ce034f404 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -78,7 +78,9 @@ import java.util.stream.Collectors;
public class FlinkAppHttpWatcher {
@Autowired private ApplicationManageService applicationManageService;
+
@Autowired private ApplicationActionService applicationActionService;
+
@Autowired private ApplicationInfoService applicationInfoService;
@Autowired private AlertService alertService;
@@ -123,11 +125,11 @@ public class FlinkAppHttpWatcher {
*
*
* <pre>
- * StopFrom: marked a task stopped from the stream-park web or other ways.
- * If stop from stream-park web, you can know whether to make a savepoint
when you stop the task, and if you make a savepoint,
- * you can set the savepoint as the last effect savepoint, and the next time
start, will be automatically choose to start.
- * In other words, if stop from other ways, there is no way to know the
savepoint has been done, directly set all the savepoint
- * to expire, and needs to be manually specified when started again.
+ * StopFrom: marked a flink job canceling from the StreamPark or other ways:
+ * 1) If stop from streampark, We can know whether to make a savepoint
when flink job canceling, and if We make a savepoint,
+ * We can set the savepoint as the latest savepoint, and the next time
start, will be automatically choose to start.
+ * 2) if stop from other ways, there is no way to know the savepoint has
been done, directly set all the savepoint to expire,
+ * and needs to be manually specified when started again.
* </pre>
*/
private static final Map<Long, StopFrom> STOP_FROM_MAP = new
ConcurrentHashMap<>(0);
@@ -215,74 +217,37 @@ public class FlinkAppHttpWatcher {
private void watch(Long id, Application application) {
EXECUTOR.execute(
() -> {
- final StopFrom stopFrom =
- STOP_FROM_MAP.getOrDefault(id, null) == null ? StopFrom.NONE :
STOP_FROM_MAP.get(id);
- final OptionState optionState = OPTIONING.get(id);
try {
// query status from flink rest api
- getFromFlinkRestApi(application, stopFrom);
+ getStateFromFlink(application);
} catch (Exception flinkException) {
// query status from yarn rest api
try {
- getFromYarnRestApi(application, stopFrom);
- } catch (Exception yarnException) {
- /*
- Query from flink's restAPI and yarn's restAPI both failed.
- In this case, it is necessary to decide whether to return to
the final state depending on the state being operated
- */
- if (optionState == null || OptionState.STARTING != optionState) {
- // non-mapping
- if (application.getState() !=
FlinkAppState.MAPPING.getValue()) {
- log.error(
- "FlinkAppHttpWatcher getFromFlinkRestApi and
getFromYarnRestApi error,job failed,savePoint expired!");
- if (StopFrom.NONE == stopFrom) {
- savePointService.expire(application.getId());
- application.setState(FlinkAppState.LOST.getValue());
- doAlert(application, FlinkAppState.LOST);
- } else {
- application.setState(FlinkAppState.CANCELED.getValue());
- }
- }
- /*
- This step means that the above two ways to get information
have failed, and this step is the last step,
- which will directly identify the mission as cancelled or lost.
- Need clean savepoint.
- */
- application.setEndTime(new Date());
- cleanSavepoint(application);
- cleanOptioning(optionState, id);
- doPersistMetrics(application, true);
- FlinkAppState appState = application.getStateEnum();
- if (FlinkAppState.FAILED == appState || FlinkAppState.LOST ==
appState) {
- doAlert(application, application.getStateEnum());
- if (FlinkAppState.FAILED == appState) {
- try {
- applicationActionService.start(application, true);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }
- }
- }
+ getStateFromYarn(application);
+ } catch (Exception e) {
+ doStateFailed(application);
}
}
});
}
+ private StopFrom getAppStopFrom(Long appId) {
+ return STOP_FROM_MAP.getOrDefault(appId, StopFrom.NONE);
+ }
+
/**
* Get the current task running status information from Flink rest api.
*
* @param application The application for which to retrieve the information
- * @param stopFrom The stop source from which the method was called
* @throws Exception if an error occurs while retrieving the information
from the Flink REST API
*/
- private void getFromFlinkRestApi(Application application, StopFrom stopFrom)
throws Exception {
+ private void getStateFromFlink(Application application) throws Exception {
JobsOverview jobsOverview = httpJobsOverview(application);
Optional<JobsOverview.Job> optional;
ExecutionMode execMode = application.getExecutionModeEnum();
if (ExecutionMode.YARN_APPLICATION == execMode ||
ExecutionMode.YARN_PER_JOB == execMode) {
optional =
- jobsOverview.getJobs().size() > 1
+ !jobsOverview.getJobs().isEmpty()
? jobsOverview.getJobs().stream()
.filter(a -> StringUtils.equals(application.getJobId(),
a.getId()))
.findFirst()
@@ -316,7 +281,126 @@ public class FlinkAppHttpWatcher {
if (FlinkAppState.RUNNING == currentState) {
handleRunningState(application, optionState, currentState);
} else {
- handleNotRunState(application, optionState, currentState, stopFrom);
+ handleNotRunState(application, optionState, currentState);
+ }
+ }
+ }
+ }
+
+ /**
+ * <strong>Query the job history in yarn, indicating that the task has
stopped, and the final
+ * status of the task is CANCELED</strong>
+ *
+ * @param application application
+ */
+ private void getStateFromYarn(Application application) throws Exception {
+ OptionState optionState = OPTIONING.get(application.getId());
+ /*
+ If the status of the last time is CANCELING (flink rest server is not
closed at the time of getting information)
+ and the status is not obtained this time (flink rest server is closed),
+ the task is considered CANCELED
+ */
+ Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
+ StopFrom stopFrom = getAppStopFrom(application.getId());
+
+ if (flag != null) {
+ log.info("FlinkAppHttpWatcher previous state: canceling.");
+ if (stopFrom.isNone()) {
+ log.error(
+ "FlinkAppHttpWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
+ savePointService.expire(application.getId());
+ }
+ application.setState(FlinkAppState.CANCELED.getValue());
+ cleanSavepoint(application);
+ cleanOptioning(optionState, application.getId());
+ doPersistMetrics(application, true);
+ } else {
+ // query the status from the yarn rest Api
+ YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+ if (yarnAppInfo == null) {
+ if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
+ throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn
failed ");
+ }
+ } else {
+ try {
+ String state = yarnAppInfo.getApp().getFinalStatus();
+ FlinkAppState flinkAppState = FlinkAppState.of(state);
+ if (FlinkAppState.OTHER == flinkAppState) {
+ return;
+ }
+ if (FlinkAppState.KILLED == flinkAppState) {
+ if (stopFrom.isNone()) {
+ log.error(
+ "FlinkAppHttpWatcher getStateFromYarn,job was killed and
stopFrom NotFound,savePoint expired!");
+ savePointService.expire(application.getId());
+ }
+ flinkAppState = FlinkAppState.CANCELED;
+ cleanSavepoint(application);
+ application.setEndTime(new Date());
+ }
+ if (FlinkAppState.SUCCEEDED == flinkAppState) {
+ flinkAppState = FlinkAppState.FINISHED;
+ }
+ application.setState(flinkAppState.getValue());
+ cleanOptioning(optionState, application.getId());
+ doPersistMetrics(application, true);
+ if (FlinkAppState.FAILED == flinkAppState
+ || FlinkAppState.LOST == flinkAppState
+ || (FlinkAppState.CANCELED == flinkAppState && stopFrom.isNone())
+ || applicationInfoService.checkAlter(application)) {
+ doAlert(application, flinkAppState);
+ stopCanceledJob(application.getId());
+ if (FlinkAppState.FAILED == flinkAppState) {
+ applicationActionService.start(application, true);
+ }
+ }
+ } catch (Exception e) {
+ if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
+ throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn
error,", e);
+ }
+ }
+ }
+ }
+ }
+
+ private void doStateFailed(Application application) {
+ /*
+ Query from flink's restAPI and yarn's restAPI both failed.
+ In this case, it is necessary to decide whether to return to the final
state depending on the state being operated
+ */
+ final OptionState optionState = OPTIONING.get(application.getId());
+ if (OptionState.STARTING != optionState) {
+ // non-mapping
+ if (application.getStateEnum() != FlinkAppState.MAPPING) {
+ log.error(
+ "FlinkAppHttpWatcher getStateFromFlink and getStateFromYARN
error,job failed, savePoint expired!");
+ StopFrom stopFrom = getAppStopFrom(application.getId());
+ if (stopFrom.isNone()) {
+ savePointService.expire(application.getId());
+ application.setState(FlinkAppState.LOST.getValue());
+ doAlert(application, FlinkAppState.LOST);
+ } else {
+ application.setState(FlinkAppState.CANCELED.getValue());
+ }
+ }
+ /*
+ This step means that the above two ways to get information have failed,
and this step is the last step,
+ which will directly identify the mission as cancelled or lost.
+ Need clean savepoint.
+ */
+ application.setEndTime(new Date());
+ cleanSavepoint(application);
+ cleanOptioning(optionState, application.getId());
+ doPersistMetrics(application, true);
+ FlinkAppState appState = application.getStateEnum();
+ if (FlinkAppState.FAILED == appState || FlinkAppState.LOST == appState) {
+ doAlert(application, application.getStateEnum());
+ if (FlinkAppState.FAILED == appState) {
+ try {
+ applicationActionService.start(application, true);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
}
}
}
@@ -438,14 +522,11 @@ public class FlinkAppHttpWatcher {
* @param application application
* @param optionState optionState
* @param currentState currentState
- * @param stopFrom stopFrom
*/
private void handleNotRunState(
- Application application,
- OptionState optionState,
- FlinkAppState currentState,
- StopFrom stopFrom)
+ Application application, OptionState optionState, FlinkAppState
currentState)
throws Exception {
+
switch (currentState) {
case CANCELLING:
CANCELING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
@@ -459,8 +540,9 @@ public class FlinkAppHttpWatcher {
currentState.name());
cleanSavepoint(application);
application.setState(currentState.getValue());
- if (StopFrom.NONE == stopFrom ||
applicationInfoService.checkAlter(application)) {
- if (StopFrom.NONE == stopFrom) {
+ StopFrom stopFrom = getAppStopFrom(application.getId());
+ if (stopFrom.isNone() ||
applicationInfoService.checkAlter(application)) {
+ if (stopFrom.isNone()) {
log.info(
"FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not
form StreamPark,savePoint expired!");
savePointService.expire(application.getId());
@@ -492,83 +574,6 @@ public class FlinkAppHttpWatcher {
}
}
- /**
- * <strong>Query the job history in yarn, indicating that the task has
stopped, and the final
- * status of the task is CANCELED</strong>
- *
- * @param application application
- * @param stopFrom stopFrom
- */
- private void getFromYarnRestApi(Application application, StopFrom stopFrom)
throws Exception {
- log.debug("FlinkAppHttpWatcher getFromYarnRestApi starting...");
- OptionState optionState = OPTIONING.get(application.getId());
-
- /*
- If the status of the last time is CANCELING (flink rest server is not
closed at the time of getting information)
- and the status is not obtained this time (flink rest server is closed),
- the task is considered CANCELED
- */
- Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
- if (flag != null) {
- log.info("FlinkAppHttpWatcher previous state: canceling.");
- if (StopFrom.NONE == stopFrom) {
- log.error(
- "FlinkAppHttpWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
- }
- application.setState(FlinkAppState.CANCELED.getValue());
- cleanSavepoint(application);
- cleanOptioning(optionState, application.getId());
- doPersistMetrics(application, true);
- } else {
- // query the status from the yarn rest Api
- YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
- if (yarnAppInfo == null) {
- if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
- throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi
failed ");
- }
- } else {
- try {
- String state = yarnAppInfo.getApp().getFinalStatus();
- FlinkAppState flinkAppState = FlinkAppState.of(state);
- if (FlinkAppState.OTHER == flinkAppState) {
- return;
- }
- if (FlinkAppState.KILLED == flinkAppState) {
- if (StopFrom.NONE == stopFrom) {
- log.error(
- "FlinkAppHttpWatcher getFromYarnRestApi,job was killed and
stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
- }
- flinkAppState = FlinkAppState.CANCELED;
- cleanSavepoint(application);
- application.setEndTime(new Date());
- }
- if (FlinkAppState.SUCCEEDED == flinkAppState) {
- flinkAppState = FlinkAppState.FINISHED;
- }
- application.setState(flinkAppState.getValue());
- cleanOptioning(optionState, application.getId());
- doPersistMetrics(application, true);
- if (FlinkAppState.FAILED == flinkAppState
- || FlinkAppState.LOST == flinkAppState
- || (FlinkAppState.CANCELED == flinkAppState && StopFrom.NONE ==
stopFrom)
- || applicationInfoService.checkAlter(application)) {
- doAlert(application, flinkAppState);
- stopCanceledJob(application.getId());
- if (FlinkAppState.FAILED == flinkAppState) {
- applicationActionService.start(application, true);
- }
- }
- } catch (Exception e) {
- if (ExecutionMode.REMOTE != application.getExecutionModeEnum()) {
- throw new RuntimeException("FlinkAppHttpWatcher getFromYarnRestApi
error,", e);
- }
- }
- }
- }
- }
-
private void cleanOptioning(OptionState optionState, Long key) {
if (optionState != null) {
lastOptionTime = System.currentTimeMillis();