This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch mapping
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit afcf57f54a46b0aa247b53a474d803c2417139f5
Author: benjobs <[email protected]>
AuthorDate: Wed Jun 28 19:08:32 2023 +0800

    [Bug] remote mode remapping bug fixed
---
 .../core/service/impl/FlinkClusterServiceImpl.java |   2 -
 .../console/core/task/FlinkRESTAPIWatcher.java     | 102 +++++++++++----------
 2 files changed, 52 insertions(+), 52 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index cb7e4ce76..31d4ead6f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -32,7 +32,6 @@ import 
org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import org.apache.streampark.console.core.task.FlinkClusterWatcher;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
 import org.apache.streampark.flink.client.bean.DeployResponse;
@@ -175,7 +174,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setException(null);
       FlinkClusterWatcher.addFlinkCluster(flinkCluster);
       updateById(flinkCluster);
-      FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setAddress(null);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index d32085b3c..1d8af701f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -265,8 +265,7 @@ public class FlinkRESTAPIWatcher {
    * @param stopFrom stopFrom
    */
   private void getFromFlinkRestApi(Application application, StopFrom stopFrom) 
throws Exception {
-    FlinkCluster flinkCluster = getFlinkCluster(application);
-    JobsOverview jobsOverview = httpJobsOverview(application, flinkCluster);
+    JobsOverview jobsOverview = httpJobsOverview(application);
     Optional<JobsOverview.Job> optional;
     ExecutionMode execMode = application.getExecutionModeEnum();
     if (ExecutionMode.YARN_APPLICATION.equals(execMode)
@@ -339,8 +338,7 @@ public class FlinkRESTAPIWatcher {
 
     // get overview info at the first start time
     if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
-      FlinkCluster flinkCluster = getFlinkCluster(application);
-      Overview override = httpOverview(application, flinkCluster);
+      Overview override = httpOverview(application);
       if (override != null && override.getSlotsTotal() > 0) {
         application.setTotalTM(override.getTaskmanagers());
         application.setTotalSlot(override.getSlotsTotal());
@@ -352,8 +350,7 @@ public class FlinkRESTAPIWatcher {
 
   /** get latest checkpoint */
   private void handleCheckPoints(Application application) throws Exception {
-    FlinkCluster flinkCluster = getFlinkCluster(application);
-    CheckPoints checkPoints = httpCheckpoints(application, flinkCluster);
+    CheckPoints checkPoints = httpCheckpoints(application);
     if (checkPoints != null) {
       checkpointProcessor.process(application, checkPoints);
     }
@@ -602,13 +599,6 @@ public class FlinkRESTAPIWatcher {
     SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
   }
 
-  public static void removeFlinkCluster(FlinkCluster flinkCluster) {
-    if (FLINK_CLUSTER_MAP.containsKey(flinkCluster.getId())) {
-      log.info("remove flink cluster:{}", flinkCluster.getId());
-      FLINK_CLUSTER_MAP.remove(flinkCluster.getId());
-    }
-  }
-
   public static void unWatching(Long appId) {
     if (isKubernetesApp(appId)) {
       return;
@@ -647,17 +637,13 @@ public class FlinkRESTAPIWatcher {
     return FlinkK8sWatcherWrapper.isKubernetesApp(app);
   }
 
-  private FlinkCluster getFlinkCluster(Application application) {
-    if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())
-        || ExecutionMode.isSessionMode(application.getExecutionModeEnum())) {
-      FlinkCluster flinkCluster = 
FLINK_CLUSTER_MAP.get(application.getFlinkClusterId());
-      if (flinkCluster == null) {
-        flinkCluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster);
-      }
-      return flinkCluster;
+  private FlinkCluster getFlinkCluster(Long clusterId, boolean flush) {
+    FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(clusterId);
+    if (flinkCluster == null || flush) {
+      flinkCluster = flinkClusterService.getById(clusterId);
+      FLINK_CLUSTER_MAP.put(clusterId, flinkCluster);
     }
-    return null;
+    return flinkCluster;
   }
 
   private YarnAppInfo httpYarnAppInfo(Application application) throws 
Exception {
@@ -665,8 +651,7 @@ public class FlinkRESTAPIWatcher {
     return yarnRestRequest(reqURL, YarnAppInfo.class);
   }
 
-  private Overview httpOverview(Application application, FlinkCluster 
flinkCluster)
-      throws IOException {
+  private Overview httpOverview(Application application) throws IOException {
     String appId = application.getAppId();
     if (appId != null) {
       if 
(application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
@@ -685,12 +670,10 @@ public class FlinkRESTAPIWatcher {
     return null;
   }
 
-  private JobsOverview httpJobsOverview(Application application, FlinkCluster 
flinkCluster)
-      throws Exception {
+  private JobsOverview httpJobsOverview(Application application) throws 
Exception {
     final String flinkUrl = "jobs/overview";
     ExecutionMode execMode = application.getExecutionModeEnum();
-    if (ExecutionMode.YARN_PER_JOB.equals(execMode)
-        || ExecutionMode.YARN_APPLICATION.equals(execMode)) {
+    if (ExecutionMode.isYarnMode(execMode)) {
       String reqURL;
       if (StringUtils.isEmpty(application.getJobManagerUrl())) {
         String format = "proxy/%s/" + flinkUrl;
@@ -700,30 +683,31 @@ public class FlinkRESTAPIWatcher {
         reqURL = String.format(format, application.getJobManagerUrl());
       }
       return yarnRestRequest(reqURL, JobsOverview.class);
-    } else if (ExecutionMode.REMOTE.equals(execMode)
-        || ExecutionMode.YARN_SESSION.equals(execMode)) {
+    } else if (ExecutionMode.REMOTE.equals(execMode)) {
       if (application.getJobId() != null) {
-        String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl;
-        JobsOverview jobsOverview = httpRestRequest(remoteUrl, 
JobsOverview.class);
-        if (jobsOverview != null) {
-          List<JobsOverview.Job> jobs =
-              jobsOverview.getJobs().stream()
-                  .filter(x -> x.getId().equals(application.getJobId()))
-                  .collect(Collectors.toList());
-          jobsOverview.setJobs(jobs);
-        }
-        return jobsOverview;
+        return httpClusterOrElseTry(
+            application.getFlinkClusterId(),
+            cluster -> {
+              String remoteUrl = cluster.getAddress() + "/" + flinkUrl;
+              JobsOverview jobsOverview = httpRestRequest(remoteUrl, 
JobsOverview.class);
+              if (jobsOverview != null) {
+                List<JobsOverview.Job> jobs =
+                    jobsOverview.getJobs().stream()
+                        .filter(x -> x.getId().equals(application.getJobId()))
+                        .collect(Collectors.toList());
+                jobsOverview.setJobs(jobs);
+              }
+              return jobsOverview;
+            });
       }
     }
     return null;
   }
 
-  private CheckPoints httpCheckpoints(Application application, FlinkCluster 
flinkCluster)
-      throws IOException {
+  private CheckPoints httpCheckpoints(Application application) throws 
Exception {
     final String flinkUrl = "jobs/%s/checkpoints";
     ExecutionMode execMode = application.getExecutionModeEnum();
-    if (ExecutionMode.YARN_PER_JOB.equals(execMode)
-        || ExecutionMode.YARN_APPLICATION.equals(execMode)) {
+    if (ExecutionMode.isYarnMode(execMode)) {
       String reqURL;
       if (StringUtils.isEmpty(application.getJobManagerUrl())) {
         String format = "proxy/%s/" + flinkUrl;
@@ -733,12 +717,15 @@ public class FlinkRESTAPIWatcher {
         reqURL = String.format(format, application.getJobManagerUrl(), 
application.getJobId());
       }
       return yarnRestRequest(reqURL, CheckPoints.class);
-    } else if (ExecutionMode.REMOTE.equals(execMode)
-        || ExecutionMode.YARN_SESSION.equals(execMode)) {
+    } else if (ExecutionMode.REMOTE.equals(execMode)) {
       if (application.getJobId() != null) {
-        String remoteUrl =
-            flinkCluster.getAddress() + "/" + String.format(flinkUrl, 
application.getJobId());
-        return httpRestRequest(remoteUrl, CheckPoints.class);
+        return httpClusterOrElseTry(
+            application.getFlinkClusterId(),
+            cluster -> {
+              String remoteUrl =
+                  cluster.getAddress() + "/" + String.format(flinkUrl, 
application.getJobId());
+              return httpRestRequest(remoteUrl, CheckPoints.class);
+            });
       }
     }
     return null;
@@ -765,4 +752,19 @@ public class FlinkRESTAPIWatcher {
   public boolean isWatchingApp(Long id) {
     return WATCHING_APPS.containsKey(id);
   }
+
+  private <T> T httpClusterOrElseTry(Long clusterId, Callback<FlinkCluster, T> 
function)
+      throws Exception {
+    FlinkCluster flinkCluster = getFlinkCluster(clusterId, false);
+    try {
+      return function.call(flinkCluster);
+    } catch (Exception e) {
+      flinkCluster = getFlinkCluster(clusterId, true);
+      return function.call(flinkCluster);
+    }
+  }
+
+  interface Callback<T, R> {
+    R call(T e) throws Exception;
+  }
 }

Reply via email to