This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch mapping in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 9114fc3bec646262d028db869c6a4c418a9c8b0d Author: benjobs <[email protected]> AuthorDate: Fri Jun 30 21:25:46 2023 +0800 job state mapping bug fixed --- .../core/service/impl/FlinkClusterServiceImpl.java | 2 - .../console/core/task/FlinkRESTAPIWatcher.java | 221 +++++++++++---------- .../resources/mapper/core/ApplicationMapper.xml | 3 +- 3 files changed, 114 insertions(+), 112 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index d4570bb2a..fde4c00e6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -32,7 +32,6 @@ import org.apache.streampark.console.core.service.CommonService; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.YarnQueueService; -import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher; import org.apache.streampark.flink.client.FlinkClient; import org.apache.streampark.flink.client.bean.DeployRequest; import org.apache.streampark.flink.client.bean.DeployResponse; @@ -196,7 +195,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli flinkCluster.setClusterState(ClusterState.STARTED.getValue()); flinkCluster.setException(null); updateById(flinkCluster); - FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster); } else { throw new ApiAlertException( "deploy cluster failed, unknown reason,please check you params or StreamPark error log"); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index e3551ef09..4ebbf75cd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -196,66 +196,67 @@ public class FlinkRESTAPIWatcher { private void doWatch() { lastWatchingTime = System.currentTimeMillis(); for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) { - EXECUTOR.execute( - () -> { - long key = entry.getKey(); - Application application = entry.getValue(); - final StopFrom stopFrom = - STOP_FROM_MAP.getOrDefault(key, null) == null - ? StopFrom.NONE - : STOP_FROM_MAP.get(key); - final OptionState optionState = OPTIONING.get(key); + watch(entry.getKey(), entry.getValue()); + } + } + + private void watch(Long key, Application application) { + EXECUTOR.execute( + () -> { + final StopFrom stopFrom = + STOP_FROM_MAP.getOrDefault(key, null) == null + ? StopFrom.NONE + : STOP_FROM_MAP.get(key); + final OptionState optionState = OPTIONING.get(key); + try { + // query status from flink rest api + getFromFlinkRestApi(application, stopFrom); + } catch (Exception flinkException) { + // query status from yarn rest api try { - // query status from flink rest api - getFromFlinkRestApi(application, stopFrom); - } catch (Exception flinkException) { - // query status from yarn rest api - try { - getFromYarnRestApi(application, stopFrom); - } catch (Exception yarnException) { + getFromYarnRestApi(application, stopFrom); + } catch (Exception yarnException) { + /* + Query from flink's restAPI and yarn's restAPI both failed. + In this case, it is necessary to decide whether to return to the final state depending on the state being operated + */ + if (optionState == null || !optionState.equals(OptionState.STARTING)) { + // non-mapping + if (application.getState() != FlinkAppState.MAPPING.getValue()) { + log.error( + "FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); + if (StopFrom.NONE.equals(stopFrom)) { + savePointService.expire(application.getId()); + application.setState(FlinkAppState.LOST.getValue()); + alertService.alert(application, FlinkAppState.LOST); + } else { + application.setState(FlinkAppState.CANCELED.getValue()); + } + } /* - Query from flink's restAPI and yarn's restAPI both failed. - In this case, it is necessary to decide whether to return to the final state depending on the state being operated + This step means that the above two ways to get information have failed, and this step is the last step, + which will directly identify the mission as cancelled or lost. + Need clean savepoint. */ - if (optionState == null || !optionState.equals(OptionState.STARTING)) { - // non-mapping - if (application.getState() != FlinkAppState.MAPPING.getValue()) { - log.error( - "FlinkRESTAPIWatcher getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint expired!"); - if (StopFrom.NONE.equals(stopFrom)) { - savePointService.expire(application.getId()); - application.setState(FlinkAppState.LOST.getValue()); - alertService.alert(application, FlinkAppState.LOST); - } else { - application.setState(FlinkAppState.CANCELED.getValue()); - } - } - /* - This step means that the above two ways to get information have failed, and this step is the last step, - which will directly identify the mission as cancelled or lost. - Need clean savepoint. - */ - application.setEndTime(new Date()); - cleanSavepoint(application); - cleanOptioning(optionState, key); - doPersistMetrics(application, true); - FlinkAppState appState = FlinkAppState.of(application.getState()); - if (appState.equals(FlinkAppState.FAILED) - || appState.equals(FlinkAppState.LOST)) { - alertService.alert(application, FlinkAppState.of(application.getState())); - if (appState.equals(FlinkAppState.FAILED)) { - try { - applicationService.start(application, true); - } catch (Exception e) { - log.error(e.getMessage(), e); - } + application.setEndTime(new Date()); + cleanSavepoint(application); + cleanOptioning(optionState, key); + doPersistMetrics(application, true); + FlinkAppState appState = FlinkAppState.of(application.getState()); + if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { + alertService.alert(application, FlinkAppState.of(application.getState())); + if (appState.equals(FlinkAppState.FAILED)) { + try { + applicationService.start(application, true); + } catch (Exception e) { + log.error(e.getMessage(), e); } } } } } - }); - } + } + }); } /** @@ -265,8 +266,7 @@ public class FlinkRESTAPIWatcher { * @param stopFrom stopFrom */ private void getFromFlinkRestApi(Application application, StopFrom stopFrom) throws Exception { - FlinkCluster flinkCluster = getFlinkCluster(application); - JobsOverview jobsOverview = httpJobsOverview(application, flinkCluster); + JobsOverview jobsOverview = httpJobsOverview(application); Optional<JobsOverview.Job> optional; ExecutionMode execMode = application.getExecutionModeEnum(); if (ExecutionMode.YARN_APPLICATION.equals(execMode) @@ -339,8 +339,7 @@ public class FlinkRESTAPIWatcher { // get overview info at the first start time if (STARTING_CACHE.getIfPresent(application.getId()) != null) { - FlinkCluster flinkCluster = getFlinkCluster(application); - Overview override = httpOverview(application, flinkCluster); + Overview override = httpOverview(application); if (override != null && override.getSlotsTotal() > 0) { application.setTotalTM(override.getTaskmanagers()); application.setTotalSlot(override.getSlotsTotal()); @@ -352,8 +351,7 @@ public class FlinkRESTAPIWatcher { /** get latest checkpoint */ private void handleCheckPoints(Application application) throws Exception { - FlinkCluster flinkCluster = getFlinkCluster(application); - CheckPoints checkPoints = httpCheckpoints(application, flinkCluster); + CheckPoints checkPoints = httpCheckpoints(application); if (checkPoints != null) { checkpointProcessor.process(application, checkPoints); } @@ -602,13 +600,6 @@ public class FlinkRESTAPIWatcher { SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE); } - public static void removeFlinkCluster(FlinkCluster flinkCluster) { - if (FLINK_CLUSTER_MAP.containsKey(flinkCluster.getId())) { - log.info("remove flink cluster:{}", flinkCluster.getId()); - FLINK_CLUSTER_MAP.remove(flinkCluster.getId()); - } - } - public static void unWatching(Long appId) { if (isKubernetesApp(appId)) { return; @@ -647,26 +638,12 @@ public class FlinkRESTAPIWatcher { return FlinkK8sWatcherWrapper.isKubernetesApp(app); } - private FlinkCluster getFlinkCluster(Application application) { - if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum()) - || ExecutionMode.isSessionMode(application.getExecutionModeEnum())) { - FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(application.getFlinkClusterId()); - if (flinkCluster == null) { - flinkCluster = flinkClusterService.getById(application.getFlinkClusterId()); - FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster); - } - return flinkCluster; - } - return null; - } - private YarnAppInfo httpYarnAppInfo(Application application) throws Exception { String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId()); return yarnRestRequest(reqURL, YarnAppInfo.class); } - private Overview httpOverview(Application application, FlinkCluster flinkCluster) - throws IOException { + private Overview httpOverview(Application application) throws IOException { String appId = application.getAppId(); if (appId != null) { if (application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION) @@ -685,12 +662,10 @@ public class FlinkRESTAPIWatcher { return null; } - private JobsOverview httpJobsOverview(Application application, FlinkCluster flinkCluster) - throws Exception { + private JobsOverview httpJobsOverview(Application application) throws Exception { final String flinkUrl = "jobs/overview"; ExecutionMode execMode = application.getExecutionModeEnum(); - if (ExecutionMode.YARN_PER_JOB.equals(execMode) - || ExecutionMode.YARN_APPLICATION.equals(execMode)) { + if (ExecutionMode.isYarnMode(execMode)) { String reqURL; if (StringUtils.isEmpty(application.getJobManagerUrl())) { String format = "proxy/%s/" + flinkUrl; @@ -700,30 +675,31 @@ public class FlinkRESTAPIWatcher { reqURL = String.format(format, application.getJobManagerUrl()); } return yarnRestRequest(reqURL, JobsOverview.class); - } else if (ExecutionMode.REMOTE.equals(execMode) - || ExecutionMode.YARN_SESSION.equals(execMode)) { - if (application.getJobId() != null) { - String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl; - JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class); - if (jobsOverview != null) { - List<JobsOverview.Job> jobs = - jobsOverview.getJobs().stream() - .filter(x -> x.getId().equals(application.getJobId())) - .collect(Collectors.toList()); - jobsOverview.setJobs(jobs); - } - return jobsOverview; - } + } + + if (application.getJobId() != null && ExecutionMode.isRemoteMode(execMode)) { + return httpRemoteCluster( + application.getFlinkClusterId(), + cluster -> { + String remoteUrl = cluster.getAddress() + "/" + flinkUrl; + JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class); + if (jobsOverview != null) { + List<JobsOverview.Job> jobs = + jobsOverview.getJobs().stream() + .filter(x -> x.getId().equals(application.getJobId())) + .collect(Collectors.toList()); + jobsOverview.setJobs(jobs); + } + return jobsOverview; + }); } return null; } - private CheckPoints httpCheckpoints(Application application, FlinkCluster flinkCluster) - throws IOException { + private CheckPoints httpCheckpoints(Application application) throws Exception { final String flinkUrl = "jobs/%s/checkpoints"; ExecutionMode execMode = application.getExecutionModeEnum(); - if (ExecutionMode.YARN_PER_JOB.equals(execMode) - || ExecutionMode.YARN_APPLICATION.equals(execMode)) { + if (ExecutionMode.isYarnMode(execMode)) { String reqURL; if (StringUtils.isEmpty(application.getJobManagerUrl())) { String format = "proxy/%s/" + flinkUrl; @@ -733,13 +709,16 @@ public class FlinkRESTAPIWatcher { reqURL = String.format(format, application.getJobManagerUrl(), application.getJobId()); } return yarnRestRequest(reqURL, CheckPoints.class); - } else if (ExecutionMode.REMOTE.equals(execMode) - || ExecutionMode.YARN_SESSION.equals(execMode)) { - if (application.getJobId() != null) { - String remoteUrl = - flinkCluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId()); - return httpRestRequest(remoteUrl, CheckPoints.class); - } + } + + if (application.getJobId() != null && ExecutionMode.isRemoteMode(execMode)) { + return httpRemoteCluster( + application.getFlinkClusterId(), + cluster -> { + String remoteUrl = + cluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId()); + return httpRestRequest(remoteUrl, CheckPoints.class); + }); } return null; } @@ -764,4 +743,28 @@ public class FlinkRESTAPIWatcher { public boolean isWatchingApp(Long id) { return WATCHING_APPS.containsKey(id); } + + private <T> T httpRemoteCluster(Long clusterId, Callback<FlinkCluster, T> function) + throws Exception { + FlinkCluster flinkCluster = getFlinkRemoteCluster(clusterId, false); + try { + return function.call(flinkCluster); + } catch (Exception e) { + flinkCluster = getFlinkRemoteCluster(clusterId, true); + return function.call(flinkCluster); + } + } + + private FlinkCluster getFlinkRemoteCluster(Long clusterId, boolean flush) { + FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(clusterId); + if (flinkCluster == null || flush) { + flinkCluster = flinkClusterService.getById(clusterId); + FLINK_CLUSTER_MAP.put(clusterId, flinkCluster); + } + return flinkCluster; + } + + interface Callback<T, R> { + R call(T e) throws Exception; + } } diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml index e4eca78d0..a6c97121c 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -275,7 +275,8 @@ app_id=#{application.appId}, </if> end_time=null, - state=14 + state=14, + tracking=1 </set> where id=#{application.id} </update>
