This is an automated email from the ASF dual-hosted git repository.

zhouli pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8d5a4de18 [Bug] remote mode job remapping status bug fixed (#2819)
8d5a4de18 is described below

commit 8d5a4de187523b8dfcbc4dcb927210c2c2936bff
Author: benjobs <[email protected]>
AuthorDate: Fri Jun 30 09:39:59 2023 +0800

    [Bug] remote mode job remapping status bug fixed (#2819)
    
    * [Bug] remote mode remapping bug fixed
    
    * minor improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../apache/streampark/common/util/YarnUtils.scala  |   3 +-
 .../core/service/impl/FlinkClusterServiceImpl.java |   2 -
 .../console/core/task/FlinkRESTAPIWatcher.java     | 221 +++++++++++----------
 3 files changed, 114 insertions(+), 112 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index cdf973452..2c58dda15 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -34,6 +34,7 @@ import java.util.{HashMap => JavaHashMap, List => JavaList}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.convert.ImplicitConversions._
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 import scala.util.control.Breaks.{break, breakable}
@@ -204,7 +205,7 @@ object YarnUtils extends Logger {
 
             val address = NetUtils.getConnectAddress(inetSocketAddress)
 
-            val buffer = new StringBuilder(protocol)
+            val buffer = new mutable.StringBuilder(protocol)
             val resolved = address.getAddress
             if (resolved != null && !resolved.isAnyLocalAddress && 
!resolved.isLoopbackAddress) {
               buffer.append(address.getHostName)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index cb7e4ce76..31d4ead6f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -32,7 +32,6 @@ import 
org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import org.apache.streampark.console.core.task.FlinkClusterWatcher;
-import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
 import org.apache.streampark.flink.client.bean.DeployResponse;
@@ -175,7 +174,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setException(null);
       FlinkClusterWatcher.addFlinkCluster(flinkCluster);
       updateById(flinkCluster);
-      FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setAddress(null);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index d32085b3c..e9b9f2224 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -196,66 +196,67 @@ public class FlinkRESTAPIWatcher {
   private void doWatch() {
     lastWatchingTime = System.currentTimeMillis();
     for (Map.Entry<Long, Application> entry : WATCHING_APPS.entrySet()) {
-      EXECUTOR.execute(
-          () -> {
-            long key = entry.getKey();
-            Application application = entry.getValue();
-            final StopFrom stopFrom =
-                STOP_FROM_MAP.getOrDefault(key, null) == null
-                    ? StopFrom.NONE
-                    : STOP_FROM_MAP.get(key);
-            final OptionState optionState = OPTIONING.get(key);
+      watch(entry.getKey(), entry.getValue());
+    }
+  }
+
+  private void watch(Long key, Application application) {
+    EXECUTOR.execute(
+        () -> {
+          final StopFrom stopFrom =
+              STOP_FROM_MAP.getOrDefault(key, null) == null
+                  ? StopFrom.NONE
+                  : STOP_FROM_MAP.get(key);
+          final OptionState optionState = OPTIONING.get(key);
+          try {
+            // query status from flink rest api
+            getFromFlinkRestApi(application, stopFrom);
+          } catch (Exception flinkException) {
+            // query status from yarn rest api
             try {
-              // query status from flink rest api
-              getFromFlinkRestApi(application, stopFrom);
-            } catch (Exception flinkException) {
-              // query status from yarn rest api
-              try {
-                getFromYarnRestApi(application, stopFrom);
-              } catch (Exception yarnException) {
+              getFromYarnRestApi(application, stopFrom);
+            } catch (Exception yarnException) {
+              /*
+               Query from flink's restAPI and yarn's restAPI both failed.
+               In this case, it is necessary to decide whether to return to 
the final state depending on the state being operated
+              */
+              if (optionState == null || 
!optionState.equals(OptionState.STARTING)) {
+                // non-mapping
+                if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
+                  log.error(
+                      "FlinkRESTAPIWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
+                  if (StopFrom.NONE.equals(stopFrom)) {
+                    savePointService.expire(application.getId());
+                    application.setState(FlinkAppState.LOST.getValue());
+                    alertService.alert(application, FlinkAppState.LOST);
+                  } else {
+                    application.setState(FlinkAppState.CANCELED.getValue());
+                  }
+                }
                 /*
-                 Query from flink's restAPI and yarn's restAPI both failed.
-                 In this case, it is necessary to decide whether to return to 
the final state depending on the state being operated
+                 This step means that the above two ways to get information 
have failed, and this step is the last step,
+                 which will directly identify the mission as cancelled or lost.
+                 Need clean savepoint.
                 */
-                if (optionState == null || 
!optionState.equals(OptionState.STARTING)) {
-                  // non-mapping
-                  if (application.getState() != 
FlinkAppState.MAPPING.getValue()) {
-                    log.error(
-                        "FlinkRESTAPIWatcher getFromFlinkRestApi and 
getFromYarnRestApi error,job failed,savePoint expired!");
-                    if (StopFrom.NONE.equals(stopFrom)) {
-                      savePointService.expire(application.getId());
-                      application.setState(FlinkAppState.LOST.getValue());
-                      alertService.alert(application, FlinkAppState.LOST);
-                    } else {
-                      application.setState(FlinkAppState.CANCELED.getValue());
-                    }
-                  }
-                  /*
-                   This step means that the above two ways to get information 
have failed, and this step is the last step,
-                   which will directly identify the mission as cancelled or 
lost.
-                   Need clean savepoint.
-                  */
-                  application.setEndTime(new Date());
-                  cleanSavepoint(application);
-                  cleanOptioning(optionState, key);
-                  doPersistMetrics(application, true);
-                  FlinkAppState appState = 
FlinkAppState.of(application.getState());
-                  if (appState.equals(FlinkAppState.FAILED)
-                      || appState.equals(FlinkAppState.LOST)) {
-                    alertService.alert(application, 
FlinkAppState.of(application.getState()));
-                    if (appState.equals(FlinkAppState.FAILED)) {
-                      try {
-                        applicationService.start(application, true);
-                      } catch (Exception e) {
-                        log.error(e.getMessage(), e);
-                      }
+                application.setEndTime(new Date());
+                cleanSavepoint(application);
+                cleanOptioning(optionState, key);
+                doPersistMetrics(application, true);
+                FlinkAppState appState = 
FlinkAppState.of(application.getState());
+                if (appState.equals(FlinkAppState.FAILED) || 
appState.equals(FlinkAppState.LOST)) {
+                  alertService.alert(application, 
FlinkAppState.of(application.getState()));
+                  if (appState.equals(FlinkAppState.FAILED)) {
+                    try {
+                      applicationService.start(application, true);
+                    } catch (Exception e) {
+                      log.error(e.getMessage(), e);
                     }
                   }
                 }
               }
             }
-          });
-    }
+          }
+        });
   }
 
   /**
@@ -265,8 +266,7 @@ public class FlinkRESTAPIWatcher {
    * @param stopFrom stopFrom
    */
   private void getFromFlinkRestApi(Application application, StopFrom stopFrom) 
throws Exception {
-    FlinkCluster flinkCluster = getFlinkCluster(application);
-    JobsOverview jobsOverview = httpJobsOverview(application, flinkCluster);
+    JobsOverview jobsOverview = httpJobsOverview(application);
     Optional<JobsOverview.Job> optional;
     ExecutionMode execMode = application.getExecutionModeEnum();
     if (ExecutionMode.YARN_APPLICATION.equals(execMode)
@@ -339,8 +339,7 @@ public class FlinkRESTAPIWatcher {
 
     // get overview info at the first start time
     if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
-      FlinkCluster flinkCluster = getFlinkCluster(application);
-      Overview override = httpOverview(application, flinkCluster);
+      Overview override = httpOverview(application);
       if (override != null && override.getSlotsTotal() > 0) {
         application.setTotalTM(override.getTaskmanagers());
         application.setTotalSlot(override.getSlotsTotal());
@@ -352,8 +351,7 @@ public class FlinkRESTAPIWatcher {
 
   /** get latest checkpoint */
   private void handleCheckPoints(Application application) throws Exception {
-    FlinkCluster flinkCluster = getFlinkCluster(application);
-    CheckPoints checkPoints = httpCheckpoints(application, flinkCluster);
+    CheckPoints checkPoints = httpCheckpoints(application);
     if (checkPoints != null) {
       checkpointProcessor.process(application, checkPoints);
     }
@@ -602,13 +600,6 @@ public class FlinkRESTAPIWatcher {
     SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
   }
 
-  public static void removeFlinkCluster(FlinkCluster flinkCluster) {
-    if (FLINK_CLUSTER_MAP.containsKey(flinkCluster.getId())) {
-      log.info("remove flink cluster:{}", flinkCluster.getId());
-      FLINK_CLUSTER_MAP.remove(flinkCluster.getId());
-    }
-  }
-
   public static void unWatching(Long appId) {
     if (isKubernetesApp(appId)) {
       return;
@@ -647,26 +638,12 @@ public class FlinkRESTAPIWatcher {
     return FlinkK8sWatcherWrapper.isKubernetesApp(app);
   }
 
-  private FlinkCluster getFlinkCluster(Application application) {
-    if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())
-        || ExecutionMode.isSessionMode(application.getExecutionModeEnum())) {
-      FlinkCluster flinkCluster = 
FLINK_CLUSTER_MAP.get(application.getFlinkClusterId());
-      if (flinkCluster == null) {
-        flinkCluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster);
-      }
-      return flinkCluster;
-    }
-    return null;
-  }
-
   private YarnAppInfo httpYarnAppInfo(Application application) throws 
Exception {
     String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId());
     return yarnRestRequest(reqURL, YarnAppInfo.class);
   }
 
-  private Overview httpOverview(Application application, FlinkCluster 
flinkCluster)
-      throws IOException {
+  private Overview httpOverview(Application application) throws IOException {
     String appId = application.getAppId();
     if (appId != null) {
       if 
(application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
@@ -685,12 +662,10 @@ public class FlinkRESTAPIWatcher {
     return null;
   }
 
-  private JobsOverview httpJobsOverview(Application application, FlinkCluster 
flinkCluster)
-      throws Exception {
+  private JobsOverview httpJobsOverview(Application application) throws 
Exception {
     final String flinkUrl = "jobs/overview";
     ExecutionMode execMode = application.getExecutionModeEnum();
-    if (ExecutionMode.YARN_PER_JOB.equals(execMode)
-        || ExecutionMode.YARN_APPLICATION.equals(execMode)) {
+    if (ExecutionMode.isYarnMode(execMode)) {
       String reqURL;
       if (StringUtils.isEmpty(application.getJobManagerUrl())) {
         String format = "proxy/%s/" + flinkUrl;
@@ -700,30 +675,31 @@ public class FlinkRESTAPIWatcher {
         reqURL = String.format(format, application.getJobManagerUrl());
       }
       return yarnRestRequest(reqURL, JobsOverview.class);
-    } else if (ExecutionMode.REMOTE.equals(execMode)
-        || ExecutionMode.YARN_SESSION.equals(execMode)) {
-      if (application.getJobId() != null) {
-        String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl;
-        JobsOverview jobsOverview = httpRestRequest(remoteUrl, 
JobsOverview.class);
-        if (jobsOverview != null) {
-          List<JobsOverview.Job> jobs =
-              jobsOverview.getJobs().stream()
-                  .filter(x -> x.getId().equals(application.getJobId()))
-                  .collect(Collectors.toList());
-          jobsOverview.setJobs(jobs);
-        }
-        return jobsOverview;
-      }
+    }
+
+    if (application.getJobId() != null && 
ExecutionMode.isRemoteMode(execMode)) {
+      return httpRemoteCluster(
+          application.getFlinkClusterId(),
+          cluster -> {
+            String remoteUrl = cluster.getAddress() + "/" + flinkUrl;
+            JobsOverview jobsOverview = httpRestRequest(remoteUrl, 
JobsOverview.class);
+            if (jobsOverview != null) {
+              List<JobsOverview.Job> jobs =
+                  jobsOverview.getJobs().stream()
+                      .filter(x -> x.getId().equals(application.getJobId()))
+                      .collect(Collectors.toList());
+              jobsOverview.setJobs(jobs);
+            }
+            return jobsOverview;
+          });
     }
     return null;
   }
 
-  private CheckPoints httpCheckpoints(Application application, FlinkCluster 
flinkCluster)
-      throws IOException {
+  private CheckPoints httpCheckpoints(Application application) throws 
Exception {
     final String flinkUrl = "jobs/%s/checkpoints";
     ExecutionMode execMode = application.getExecutionModeEnum();
-    if (ExecutionMode.YARN_PER_JOB.equals(execMode)
-        || ExecutionMode.YARN_APPLICATION.equals(execMode)) {
+    if (ExecutionMode.isYarnMode(execMode)) {
       String reqURL;
       if (StringUtils.isEmpty(application.getJobManagerUrl())) {
         String format = "proxy/%s/" + flinkUrl;
@@ -733,13 +709,16 @@ public class FlinkRESTAPIWatcher {
         reqURL = String.format(format, application.getJobManagerUrl(), 
application.getJobId());
       }
       return yarnRestRequest(reqURL, CheckPoints.class);
-    } else if (ExecutionMode.REMOTE.equals(execMode)
-        || ExecutionMode.YARN_SESSION.equals(execMode)) {
-      if (application.getJobId() != null) {
-        String remoteUrl =
-            flinkCluster.getAddress() + "/" + String.format(flinkUrl, 
application.getJobId());
-        return httpRestRequest(remoteUrl, CheckPoints.class);
-      }
+    }
+
+    if (application.getJobId() != null && 
ExecutionMode.isRemoteMode(execMode)) {
+      return httpRemoteCluster(
+          application.getFlinkClusterId(),
+          cluster -> {
+            String remoteUrl =
+                cluster.getAddress() + "/" + String.format(flinkUrl, 
application.getJobId());
+            return httpRestRequest(remoteUrl, CheckPoints.class);
+          });
     }
     return null;
   }
@@ -765,4 +744,28 @@ public class FlinkRESTAPIWatcher {
   public boolean isWatchingApp(Long id) {
     return WATCHING_APPS.containsKey(id);
   }
+
+  private <T> T httpRemoteCluster(Long clusterId, Callback<FlinkCluster, T> 
function)
+      throws Exception {
+    FlinkCluster flinkCluster = getFlinkRemoteCluster(clusterId, false);
+    try {
+      return function.call(flinkCluster);
+    } catch (Exception e) {
+      flinkCluster = getFlinkRemoteCluster(clusterId, true);
+      return function.call(flinkCluster);
+    }
+  }
+
+  private FlinkCluster getFlinkRemoteCluster(Long clusterId, boolean flush) {
+    FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(clusterId);
+    if (flinkCluster == null || flush) {
+      flinkCluster = flinkClusterService.getById(clusterId);
+      FLINK_CLUSTER_MAP.put(clusterId, flinkCluster);
+    }
+    return flinkCluster;
+  }
+
+  interface Callback<T, R> {
+    R call(T e) throws Exception;
+  }
 }

Reply via email to