This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch mapping in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit afcf57f54a46b0aa247b53a474d803c2417139f5 Author: benjobs <[email protected]> AuthorDate: Wed Jun 28 19:08:32 2023 +0800 [Bug] remote mode remapping bug fixed --- .../core/service/impl/FlinkClusterServiceImpl.java | 2 - .../console/core/task/FlinkRESTAPIWatcher.java | 102 +++++++++++---------- 2 files changed, 52 insertions(+), 52 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index cb7e4ce76..31d4ead6f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -32,7 +32,6 @@ import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.console.core.task.FlinkClusterWatcher; -import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher; import org.apache.streampark.flink.client.FlinkClient; import org.apache.streampark.flink.client.bean.DeployRequest; import org.apache.streampark.flink.client.bean.DeployResponse; @@ -175,7 +174,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli flinkCluster.setException(null); FlinkClusterWatcher.addFlinkCluster(flinkCluster); updateById(flinkCluster); - FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster); } catch (Exception e) { log.error(e.getMessage(), e); flinkCluster.setAddress(null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index d32085b3c..1d8af701f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -265,8 +265,7 @@ public class FlinkRESTAPIWatcher { * @param stopFrom stopFrom */ private void getFromFlinkRestApi(Application application, StopFrom stopFrom) throws Exception { - FlinkCluster flinkCluster = getFlinkCluster(application); - JobsOverview jobsOverview = httpJobsOverview(application, flinkCluster); + JobsOverview jobsOverview = httpJobsOverview(application); Optional<JobsOverview.Job> optional; ExecutionMode execMode = application.getExecutionModeEnum(); if (ExecutionMode.YARN_APPLICATION.equals(execMode) @@ -339,8 +338,7 @@ public class FlinkRESTAPIWatcher { // get overview info at the first start time if (STARTING_CACHE.getIfPresent(application.getId()) != null) { - FlinkCluster flinkCluster = getFlinkCluster(application); - Overview override = httpOverview(application, flinkCluster); + Overview override = httpOverview(application); if (override != null && override.getSlotsTotal() > 0) { application.setTotalTM(override.getTaskmanagers()); application.setTotalSlot(override.getSlotsTotal()); @@ -352,8 +350,7 @@ public class FlinkRESTAPIWatcher { /** get latest checkpoint */ private void handleCheckPoints(Application application) throws Exception { - FlinkCluster flinkCluster = getFlinkCluster(application); - CheckPoints checkPoints = httpCheckpoints(application, flinkCluster); + CheckPoints checkPoints = httpCheckpoints(application); if (checkPoints != null) { checkpointProcessor.process(application, checkPoints); } @@ -602,13 +599,6 @@ public class FlinkRESTAPIWatcher { SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE); } - public static void removeFlinkCluster(FlinkCluster flinkCluster) { - if (FLINK_CLUSTER_MAP.containsKey(flinkCluster.getId())) { - log.info("remove flink cluster:{}", flinkCluster.getId()); - FLINK_CLUSTER_MAP.remove(flinkCluster.getId()); - } - } - public static void unWatching(Long appId) { if (isKubernetesApp(appId)) { return; @@ -647,17 +637,13 @@ public class FlinkRESTAPIWatcher { return FlinkK8sWatcherWrapper.isKubernetesApp(app); } - private FlinkCluster getFlinkCluster(Application application) { - if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum()) - || ExecutionMode.isSessionMode(application.getExecutionModeEnum())) { - FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(application.getFlinkClusterId()); - if (flinkCluster == null) { - flinkCluster = flinkClusterService.getById(application.getFlinkClusterId()); - FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster); - } - return flinkCluster; + private FlinkCluster getFlinkCluster(Long clusterId, boolean flush) { + FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(clusterId); + if (flinkCluster == null || flush) { + flinkCluster = flinkClusterService.getById(clusterId); + FLINK_CLUSTER_MAP.put(clusterId, flinkCluster); } - return null; + return flinkCluster; } private YarnAppInfo httpYarnAppInfo(Application application) throws Exception { @@ -665,8 +651,7 @@ public class FlinkRESTAPIWatcher { return yarnRestRequest(reqURL, YarnAppInfo.class); } - private Overview httpOverview(Application application, FlinkCluster flinkCluster) - throws IOException { + private Overview httpOverview(Application application) throws IOException { String appId = application.getAppId(); if (appId != null) { if (application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION) @@ -685,12 +670,10 @@ public class FlinkRESTAPIWatcher { return null; } - private JobsOverview httpJobsOverview(Application application, FlinkCluster flinkCluster) - throws Exception { + private JobsOverview httpJobsOverview(Application application) throws Exception { final String flinkUrl = "jobs/overview"; ExecutionMode execMode = application.getExecutionModeEnum(); - if (ExecutionMode.YARN_PER_JOB.equals(execMode) - || ExecutionMode.YARN_APPLICATION.equals(execMode)) { + if (ExecutionMode.isYarnMode(execMode)) { String reqURL; if (StringUtils.isEmpty(application.getJobManagerUrl())) { String format = "proxy/%s/" + flinkUrl; @@ -700,30 +683,31 @@ public class FlinkRESTAPIWatcher { reqURL = String.format(format, application.getJobManagerUrl()); } return yarnRestRequest(reqURL, JobsOverview.class); - } else if (ExecutionMode.REMOTE.equals(execMode) - || ExecutionMode.YARN_SESSION.equals(execMode)) { + } else if (ExecutionMode.REMOTE.equals(execMode)) { if (application.getJobId() != null) { - String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl; - JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class); - if (jobsOverview != null) { - List<JobsOverview.Job> jobs = - jobsOverview.getJobs().stream() - .filter(x -> x.getId().equals(application.getJobId())) - .collect(Collectors.toList()); - jobsOverview.setJobs(jobs); - } - return jobsOverview; + return httpClusterOrElseTry( + application.getFlinkClusterId(), + cluster -> { + String remoteUrl = cluster.getAddress() + "/" + flinkUrl; + JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class); + if (jobsOverview != null) { + List<JobsOverview.Job> jobs = + jobsOverview.getJobs().stream() + .filter(x -> x.getId().equals(application.getJobId())) + .collect(Collectors.toList()); + jobsOverview.setJobs(jobs); + } + return jobsOverview; + }); } } return null; } - private CheckPoints httpCheckpoints(Application application, FlinkCluster flinkCluster) - throws IOException { + private CheckPoints httpCheckpoints(Application application) throws Exception { final String flinkUrl = "jobs/%s/checkpoints"; ExecutionMode execMode = application.getExecutionModeEnum(); - if (ExecutionMode.YARN_PER_JOB.equals(execMode) - || ExecutionMode.YARN_APPLICATION.equals(execMode)) { + if (ExecutionMode.isYarnMode(execMode)) { String reqURL; if (StringUtils.isEmpty(application.getJobManagerUrl())) { String format = "proxy/%s/" + flinkUrl; @@ -733,12 +717,15 @@ public class FlinkRESTAPIWatcher { reqURL = String.format(format, application.getJobManagerUrl(), application.getJobId()); } return yarnRestRequest(reqURL, CheckPoints.class); - } else if (ExecutionMode.REMOTE.equals(execMode) - || ExecutionMode.YARN_SESSION.equals(execMode)) { + } else if (ExecutionMode.REMOTE.equals(execMode)) { if (application.getJobId() != null) { - String remoteUrl = - flinkCluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId()); - return httpRestRequest(remoteUrl, CheckPoints.class); + return httpClusterOrElseTry( + application.getFlinkClusterId(), + cluster -> { + String remoteUrl = + cluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId()); + return httpRestRequest(remoteUrl, CheckPoints.class); + }); } } return null; @@ -765,4 +752,19 @@ public class FlinkRESTAPIWatcher { public boolean isWatchingApp(Long id) { return WATCHING_APPS.containsKey(id); } + + private <T> T httpClusterOrElseTry(Long clusterId, Callback<FlinkCluster, T> function) + throws Exception { + FlinkCluster flinkCluster = getFlinkCluster(clusterId, false); + try { + return function.call(flinkCluster); + } catch (Exception e) { + flinkCluster = getFlinkCluster(clusterId, true); + return function.call(flinkCluster); + } + } + + interface Callback<T, R> { + R call(T e) throws Exception; + } }
