This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new ac1bc0bae [improvement] improvement flink cluster 
verifyClusterConnection (#2903)
ac1bc0bae is described below

commit ac1bc0bae26bf4ab302b1a71848494e9e5b70687
Author: xujiangfeng001 <[email protected]>
AuthorDate: Sat Jul 29 15:08:50 2023 +0800

    [improvement] improvement flink cluster verifyClusterConnection (#2903)
---
 .../console/core/entity/FlinkCluster.java          | 48 ----------------------
 .../core/service/impl/ApplicationServiceImpl.java  |  5 ++-
 .../core/service/impl/FlinkClusterServiceImpl.java |  8 ++--
 .../console/core/task/FlinkClusterWatcher.java     | 46 ++++++++++++++-------
 .../console/core/task/FlinkHttpWatcher.java        |  4 +-
 5 files changed, 41 insertions(+), 70 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 32feec747..9684c8406 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -24,10 +24,7 @@ import 
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.YarnUtils;
-import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.core.metrics.flink.Overview;
 import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
 
 import org.apache.commons.lang3.StringUtils;
@@ -156,51 +153,6 @@ public class FlinkCluster implements Serializable {
     return null;
   }
 
-  /**
-   * Verify the cluster connection whether is valid.
-   *
-   * @return <code>false</code> if the connection of the cluster is invalid, 
<code>true</code> else.
-   */
-  public boolean verifyClusterConnection() {
-    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
-      if (address == null) {
-        return false;
-      }
-      // 1) check url is Legal
-      if (!CommonUtils.isLegalUrl(address)) {
-        return false;
-      }
-      // 2) check connection
-      try {
-        String restUrl = address + "/overview";
-        String result =
-            HttpClientUtils.httpGetRequest(
-                restUrl,
-                RequestConfig.custom().setConnectTimeout(2000, 
TimeUnit.MILLISECONDS).build());
-        JacksonUtils.read(result, Overview.class);
-        return true;
-      } catch (Exception ignored) {
-        //
-      }
-      return false;
-    }
-    if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
-      try {
-        String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
this.clusterId + "/overview";
-        String result =
-            HttpClientUtils.httpGetRequest(
-                restUrl,
-                RequestConfig.custom().setConnectTimeout(2000, 
TimeUnit.MILLISECONDS).build());
-        JacksonUtils.read(result, Overview.class);
-        return true;
-      } catch (Exception ignored) {
-        //
-      }
-      return false;
-    }
-    return false;
-  }
-
   @JsonIgnore
   public Map<String, String> getFlinkConfig() throws JsonProcessingException {
     String restUrl = this.address + "/jobmanager/config";
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 886756770..7c9140dc1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,6 +78,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkClusterWatcher;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -216,6 +217,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Autowired private YarnQueueService yarnQueueService;
 
+  @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
   @PostConstruct
   public void resetOptionState() {
     this.baseMapper.resetOptionState();
@@ -430,7 +433,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
           || ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
         FlinkCluster flinkCluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        boolean conned = flinkCluster.verifyClusterConnection();
+        boolean conned = 
flinkClusterWatcher.verifyClusterConnection(flinkCluster);
         if (!conned) {
           throw new ApiAlertException("the target cluster is unavailable, 
please check!");
         }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index f3dda6a56..adb74ce41 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -94,6 +94,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Autowired private YarnQueueService yarnQueueService;
 
+  @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
   @Override
   public ResponseResult check(FlinkCluster cluster) {
     ResponseResult result = new ResponseResult();
@@ -118,14 +120,14 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
     // 3) Check connection
     if (ExecutionMode.isRemoteMode(cluster.getExecutionModeEnum())
-        && !cluster.verifyClusterConnection()) {
+        && !flinkClusterWatcher.verifyClusterConnection(cluster)) {
       result.setMsg("The remote cluster connection failed, please check!");
       result.setStatus(3);
       return result;
     }
     if (ExecutionMode.isYarnMode(cluster.getExecutionModeEnum())
         && cluster.getClusterId() != null
-        && !cluster.verifyClusterConnection()) {
+        && !flinkClusterWatcher.verifyClusterConnection(cluster)) {
       result.setMsg("The flink cluster connection failed, please check!");
       result.setStatus(4);
       return result;
@@ -411,7 +413,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       ApiAlertException.throwIfFalse(
           ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
           "Current cluster is not active, please check!");
-      if (!flinkCluster.verifyClusterConnection()) {
+      if (!flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
         flinkCluster.setClusterState(ClusterState.LOST.getValue());
         updateById(flinkCluster);
         throw new ApiAlertException("Current cluster is not active, please 
check!");
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 9b256742c..3762acd37 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -148,17 +148,7 @@ public class FlinkClusterWatcher {
     if (state != null) {
       return state;
     }
-    switch (flinkCluster.getExecutionModeEnum()) {
-      case REMOTE:
-        state = httpRemoteClusterState(flinkCluster);
-        break;
-      case YARN_SESSION:
-        state = httpYarnSessionClusterState(flinkCluster);
-        break;
-      default:
-        state = ClusterState.UNKNOWN;
-        break;
-    }
+    state = httpClusterState(flinkCluster);
     if (ClusterState.isRunning(state)) {
       FAILED_STATES.invalidate(flinkCluster.getId());
     } else {
@@ -192,6 +182,23 @@ public class FlinkClusterWatcher {
     return state;
   }
 
+  /**
+   * get flink cluster state
+   *
+   * @param flinkCluster
+   * @return
+   */
+  private ClusterState httpClusterState(FlinkCluster flinkCluster) {
+    switch (flinkCluster.getExecutionModeEnum()) {
+      case REMOTE:
+        return httpRemoteClusterState(flinkCluster);
+      case YARN_SESSION:
+        return httpYarnSessionClusterState(flinkCluster);
+      default:
+        return ClusterState.UNKNOWN;
+    }
+  }
+
   /**
    * cluster get state from flink rest api
    *
@@ -272,9 +279,18 @@ public class FlinkClusterWatcher {
    * @return
    */
   private ClusterState yarnStateConvertClusterState(YarnApplicationState 
state) {
-    if (state == YarnApplicationState.FINISHED) {
-      return ClusterState.CANCELED;
-    }
-    return ClusterState.of(state.toString());
+    return state == YarnApplicationState.FINISHED
+        ? ClusterState.CANCELED
+        : ClusterState.of(state.toString());
+  }
+
+  /**
+   * Verify the cluster connection whether is valid.
+   *
+   * @return <code>false</code> if the connection of the cluster is invalid, 
<code>true</code> else.
+   */
+  public Boolean verifyClusterConnection(FlinkCluster flinkCluster) {
+    ClusterState clusterState = httpClusterState(flinkCluster);
+    return ClusterState.isRunning(clusterState) ? true : false;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index f6ed5c96c..26302387a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -17,7 +17,6 @@
 
 package org.apache.streampark.console.core.task;
 
-import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -784,8 +783,7 @@ public class FlinkHttpWatcher {
       case YARN_SESSION:
       case REMOTE:
         FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
-        ClusterState clusterState = 
flinkClusterWatcher.getClusterState(flinkCluster);
-        if (ClusterState.isRunning(clusterState)) {
+        if (flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
           log.info(
               "application with id {} is yarn session or remote and flink 
cluster with id {} is alive, application send alert",
               app.getId(),

Reply via email to