This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 89239e27b [Issue-2535] Refactor FlinkClusterService. (#2744)
89239e27b is described below

commit 89239e27b31be6c0696e931ebff956a3b05c4b25
Author: Roc Marshal <[email protected]>
AuthorDate: Sun May 14 22:41:04 2023 +0800

    [Issue-2535] Refactor FlinkClusterService. (#2744)
    
    * [Issue-2535] Refactor FlinkClusterService.
    
    * Updated based on zhouli's review comments
---
 .../streampark/common/enums/ClusterState.java      |   4 +
 .../console/core/entity/FlinkCluster.java          |   8 +-
 .../core/service/impl/FlinkClusterServiceImpl.java | 306 ++++++++++-----------
 3 files changed, 157 insertions(+), 161 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 6b0ea9c4c..9dbc3aa4b 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -49,4 +49,8 @@ public enum ClusterState implements Serializable {
   public Integer getValue() {
     return value;
   }
+
+  public static boolean isStarted(ClusterState state) {
+    return STARTED.equals(state);
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 7d20f0976..57672218b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -145,6 +145,11 @@ public class FlinkCluster implements Serializable {
     return null;
   }
 
+  /**
+   * Verify the cluster connection whether is valid.
+   *
+   * @return <code>false</code> if the connection of the cluster is invalid, 
<code>true</code> else.
+   */
   public boolean verifyClusterConnection() {
     if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
       if (address == null) {
@@ -167,7 +172,8 @@ public class FlinkCluster implements Serializable {
         //
       }
       return false;
-    } else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) 
{
+    }
+    if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
       try {
         String restUrl = YarnUtils.getRMWebAppURL() + "/proxy/" + 
this.clusterId + "/overview";
         String result =
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index cfb9148ab..a1ed45c75 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -25,7 +25,6 @@ import 
org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
-import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.mapper.FlinkClusterMapper;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
@@ -52,14 +51,19 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -98,36 +102,32 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     Boolean existsByClusterName =
         this.existsByClusterName(cluster.getClusterName(), cluster.getId());
     if (existsByClusterName) {
-      result.setMsg("clusterName is already exists,please check!");
+      result.setMsg("ClusterName is already exists, please check!");
       result.setStatus(1);
       return result;
     }
 
     // 2) Check target-cluster is already exists
     String clusterId = cluster.getClusterId();
-    if (StringUtils.isNotEmpty(clusterId)) {
-      Boolean existsByClusterId = this.existsByClusterId(clusterId, 
cluster.getId());
-      if (existsByClusterId) {
-        result.setMsg("the clusterId " + clusterId + " is already 
exists,please check!");
-        result.setStatus(2);
-        return result;
-      }
+    if (StringUtils.isNotEmpty(clusterId) && this.existsByClusterId(clusterId, 
cluster.getId())) {
+      result.setMsg("The clusterId " + clusterId + " is already exists,please 
check!");
+      result.setStatus(2);
+      return result;
     }
 
     // 3) Check connection
-    if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
-      if (!cluster.verifyClusterConnection()) {
-        result.setMsg("the remote cluster connection failed, please check!");
-        result.setStatus(3);
-        return result;
-      }
-    } else if 
(ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())
-        && cluster.getClusterId() != null) {
-      if (!cluster.verifyClusterConnection()) {
-        result.setMsg("the flink cluster connection failed, please check!");
-        result.setStatus(4);
-        return result;
-      }
+    if (ExecutionMode.isRemoteMode(cluster.getExecutionModeEnum())
+        && !cluster.verifyClusterConnection()) {
+      result.setMsg("The remote cluster connection failed, please check!");
+      result.setStatus(3);
+      return result;
+    }
+    if (ExecutionMode.isYarnMode(cluster.getExecutionModeEnum())
+        && cluster.getClusterId() != null
+        && !cluster.verifyClusterConnection()) {
+      result.setMsg("The flink cluster connection failed, please check!");
+      result.setStatus(4);
+      return result;
     }
 
     return result;
@@ -140,7 +140,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     ApiAlertException.throwIfFalse(
         successful, String.format(ERROR_CLUSTER_QUEUE_HINT, 
flinkCluster.getYarnQueue()));
     flinkCluster.setCreateTime(new Date());
-    if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
       flinkCluster.setClusterState(ClusterState.STARTED.getValue());
     } else {
       flinkCluster.setClusterState(ClusterState.CREATED.getValue());
@@ -153,54 +153,21 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   public void start(FlinkCluster cluster) {
     FlinkCluster flinkCluster = getById(cluster.getId());
     try {
-      ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
-      KubernetesDeployParam kubernetesDeployParam = null;
-      switch (executionModeEnum) {
-        case YARN_SESSION:
-          break;
-        case KUBERNETES_NATIVE_SESSION:
-          kubernetesDeployParam =
-              new KubernetesDeployParam(
-                  flinkCluster.getClusterId(),
-                  flinkCluster.getK8sNamespace(),
-                  flinkCluster.getK8sConf(),
-                  flinkCluster.getServiceAccount(),
-                  flinkCluster.getFlinkImage(),
-                  flinkCluster.getK8sRestExposedTypeEnum());
-          break;
-        default:
-          throw new ApiAlertException(
-              "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
start!");
-      }
-      FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
-      DeployRequest deployRequest =
-          new DeployRequest(
-              flinkEnv.getFlinkVersion(),
-              executionModeEnum,
-              flinkCluster.getProperties(),
-              flinkCluster.getClusterId(),
-              kubernetesDeployParam);
-      log.info("deploy cluster request " + deployRequest);
-      Future<DeployResponse> future =
-          executorService.submit(() -> FlinkClient.deploy(deployRequest));
-      DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
-      if (deployResponse != null) {
-        if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
-          String address =
-              YarnUtils.getRMWebAppURL() + "/proxy/" + 
deployResponse.clusterId() + "/";
-          flinkCluster.setAddress(address);
-        } else {
-          flinkCluster.setAddress(deployResponse.address());
-        }
-        flinkCluster.setClusterId(deployResponse.clusterId());
-        flinkCluster.setClusterState(ClusterState.STARTED.getValue());
-        flinkCluster.setException(null);
-        updateById(flinkCluster);
-        FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
+      DeployResponse deployResponse = deployInternal(flinkCluster);
+      ApiAlertException.throwIfNull(
+          deployResponse,
+          "Deploy cluster failed, unknown reason,please check you params or 
StreamPark error log");
+      if 
(ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
+        String address = YarnUtils.getRMWebAppURL() + "/proxy/" + 
deployResponse.clusterId() + "/";
+        flinkCluster.setAddress(address);
       } else {
-        throw new ApiAlertException(
-            "deploy cluster failed, unknown reason,please check you params or 
StreamPark error log");
+        flinkCluster.setAddress(deployResponse.address());
       }
+      flinkCluster.setClusterId(deployResponse.clusterId());
+      flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+      flinkCluster.setException(null);
+      updateById(flinkCluster);
+      FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
@@ -216,30 +183,12 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     boolean success = validateQueueIfNeeded(flinkCluster, cluster);
     ApiAlertException.throwIfFalse(
         success, String.format(ERROR_CLUSTER_QUEUE_HINT, 
cluster.getYarnQueue()));
-    flinkCluster.setClusterName(cluster.getClusterName());
-    flinkCluster.setDescription(cluster.getDescription());
-    if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
-      flinkCluster.setAddress(cluster.getAddress());
-    } else {
-      flinkCluster.setAddress(null);
-      flinkCluster.setClusterId(cluster.getClusterId());
-      flinkCluster.setVersionId(cluster.getVersionId());
-      flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
-      flinkCluster.setOptions(cluster.getOptions());
-      flinkCluster.setResolveOrder(cluster.getResolveOrder());
-      flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
-      flinkCluster.setK8sConf(cluster.getK8sConf());
-      flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
-      flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
-      flinkCluster.setServiceAccount(cluster.getServiceAccount());
-      flinkCluster.setFlinkImage(cluster.getFlinkImage());
-      flinkCluster.setYarnQueue(cluster.getYarnQueue());
-    }
+    updateCluster(cluster, flinkCluster);
     try {
       updateById(flinkCluster);
     } catch (Exception e) {
       throw new ApiDetailException(
-          "update cluster failed, Caused By: " + 
ExceptionUtils.getStackTrace(e));
+          "Update cluster failed, Caused By: " + 
ExceptionUtils.getStackTrace(e));
     }
   }
 
@@ -247,78 +196,31 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   public void shutdown(FlinkCluster cluster) {
     FlinkCluster flinkCluster = this.getById(cluster.getId());
     // 1) check mode
-    ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
     String clusterId = flinkCluster.getClusterId();
-    KubernetesDeployParam kubernetesDeployParam = null;
-    switch (executionModeEnum) {
-      case YARN_SESSION:
-        break;
-      case KUBERNETES_NATIVE_SESSION:
-        kubernetesDeployParam =
-            new KubernetesDeployParam(
-                flinkCluster.getClusterId(),
-                flinkCluster.getK8sNamespace(),
-                flinkCluster.getK8sConf(),
-                flinkCluster.getServiceAccount(),
-                flinkCluster.getFlinkImage(),
-                flinkCluster.getK8sRestExposedTypeEnum());
-        break;
-      default:
-        throw new ApiAlertException(
-            "the ExecutionModeEnum " + executionModeEnum.getName() + "can't 
shutdown!");
-    }
-    if (StringUtils.isBlank(clusterId)) {
-      throw new ApiAlertException("the clusterId can not be empty!");
-    }
+    ApiAlertException.throwIfTrue(
+        StringUtils.isBlank(clusterId), "The clusterId can not be empty!");
 
     // 2) check cluster is active
-    if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
-      if 
(ClusterState.STARTED.equals(ClusterState.of(flinkCluster.getClusterState()))) {
-        if (!flinkCluster.verifyClusterConnection()) {
-          flinkCluster.setAddress(null);
-          flinkCluster.setClusterState(ClusterState.LOST.getValue());
-          updateById(flinkCluster);
-          throw new ApiAlertException("current cluster is not active, please 
check");
-        }
-      } else {
-        throw new ApiAlertException("current cluster is not active, please 
check");
-      }
-    }
+    checkActiveIfNeeded(flinkCluster);
 
     // 3) check job if running on cluster
     boolean existsRunningJob = 
applicationService.existsRunningJobByClusterId(flinkCluster.getId());
-    if (existsRunningJob) {
-      throw new ApiAlertException(
-          "some app is running on this cluster, the cluster cannot be 
shutdown");
-    }
-
-    // 4) shutdown
-    FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
-    ShutDownRequest stopRequest =
-        new ShutDownRequest(
-            flinkEnv.getFlinkVersion(),
-            executionModeEnum,
-            flinkCluster.getProperties(),
-            clusterId,
-            kubernetesDeployParam);
+    ApiAlertException.throwIfTrue(
+        existsRunningJob, "Some app is running on this cluster, the cluster 
cannot be shutdown");
 
     try {
-      Future<ShutDownResponse> future =
-          executorService.submit(() -> FlinkClient.shutdown(stopRequest));
-      ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
-      if (shutDownResponse != null) {
-        flinkCluster.setAddress(null);
-        flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
-        updateById(flinkCluster);
-      } else {
-        throw new ApiAlertException("get shutdown response failed");
-      }
+      // 4) shutdown
+      ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster, 
clusterId);
+      ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response 
failed");
+      flinkCluster.setAddress(null);
+      flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+      updateById(flinkCluster);
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setException(e.toString());
       updateById(flinkCluster);
       throw new ApiDetailException(
-          "shutdown cluster failed, Caused By: " + 
ExceptionUtils.getStackTrace(e));
+          "Shutdown cluster failed, Caused By: " + 
ExceptionUtils.getStackTrace(e));
     }
   }
 
@@ -355,21 +257,18 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   public void delete(FlinkCluster cluster) {
     Long id = cluster.getId();
     FlinkCluster flinkCluster = getById(id);
-    if (flinkCluster == null) {
-      throw new ApiAlertException("flink cluster not exist, please check.");
-    }
+    ApiAlertException.throwIfNull(flinkCluster, "Flink cluster not exist, 
please check.");
 
-    if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())
-        || 
ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(flinkCluster.getExecutionModeEnum()))
 {
-      if (ClusterState.STARTED.equals(flinkCluster.getClusterStateEnum())) {
-        throw new ApiAlertException("flink cluster is running, cannot be 
delete, please check.");
-      }
+    if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
+        || 
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
+      ApiAlertException.throwIfTrue(
+          ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+          "Flink cluster is running, cannot be delete, please check.");
     }
 
-    if (applicationService.existsJobByClusterId(id)) {
-      throw new ApiAlertException(
-          "some app on this cluster, the cluster cannot be delete, please 
check.");
-    }
+    ApiAlertException.throwIfTrue(
+        applicationService.existsJobByClusterId(id),
+        "Some app on this cluster, the cluster cannot be delete, please 
check.");
     removeById(id);
   }
 
@@ -422,4 +321,91 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     return ExecutionMode.isYarnSessionMode(cluster.getExecutionModeEnum())
         && !yarnQueueService.isDefaultQueue(cluster.getYarnQueue());
   }
+
+  private ShutDownResponse shutdownInternal(FlinkCluster flinkCluster, String 
clusterId)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    ShutDownRequest stopRequest =
+        new ShutDownRequest(
+            
flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+            flinkCluster.getExecutionModeEnum(),
+            flinkCluster.getProperties(),
+            clusterId,
+            getKubernetesDeployDesc(flinkCluster, "shutdown"));
+    Future<ShutDownResponse> future =
+        executorService.submit(() -> FlinkClient.shutdown(stopRequest));
+    return future.get(60, TimeUnit.SECONDS);
+  }
+
+  private DeployResponse deployInternal(FlinkCluster flinkCluster)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    DeployRequest deployRequest =
+        new DeployRequest(
+            
flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+            flinkCluster.getExecutionModeEnum(),
+            flinkCluster.getProperties(),
+            flinkCluster.getClusterId(),
+            getKubernetesDeployDesc(flinkCluster, "start"));
+    log.info("Deploy cluster request " + deployRequest);
+    Future<DeployResponse> future = executorService.submit(() -> 
FlinkClient.deploy(deployRequest));
+    return future.get(60, TimeUnit.SECONDS);
+  }
+
+  private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
+    if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
+      ApiAlertException.throwIfFalse(
+          ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+          "Current cluster is not active, please check!");
+      if (!flinkCluster.verifyClusterConnection()) {
+        flinkCluster.setAddress(null);
+        flinkCluster.setClusterState(ClusterState.LOST.getValue());
+        updateById(flinkCluster);
+        throw new ApiAlertException("Current cluster is not active, please 
check!");
+      }
+    }
+  }
+
+  private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) {
+    flinkCluster.setClusterName(cluster.getClusterName());
+    flinkCluster.setDescription(cluster.getDescription());
+    if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+      flinkCluster.setAddress(cluster.getAddress());
+    } else {
+      flinkCluster.setAddress(null);
+      flinkCluster.setClusterId(cluster.getClusterId());
+      flinkCluster.setVersionId(cluster.getVersionId());
+      flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
+      flinkCluster.setOptions(cluster.getOptions());
+      flinkCluster.setResolveOrder(cluster.getResolveOrder());
+      flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
+      flinkCluster.setK8sConf(cluster.getK8sConf());
+      flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
+      flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
+      flinkCluster.setServiceAccount(cluster.getServiceAccount());
+      flinkCluster.setFlinkImage(cluster.getFlinkImage());
+      flinkCluster.setYarnQueue(cluster.getYarnQueue());
+    }
+  }
+
+  @Nullable
+  private KubernetesDeployParam getKubernetesDeployDesc(
+      @Nonnull FlinkCluster flinkCluster, String action) {
+    ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+    switch (executionModeEnum) {
+      case YARN_SESSION:
+        break;
+      case KUBERNETES_NATIVE_SESSION:
+        return new KubernetesDeployParam(
+            flinkCluster.getClusterId(),
+            flinkCluster.getK8sNamespace(),
+            flinkCluster.getK8sConf(),
+            flinkCluster.getServiceAccount(),
+            flinkCluster.getFlinkImage(),
+            flinkCluster.getK8sRestExposedTypeEnum());
+      default:
+        throw new ApiAlertException(
+            String.format(
+                "The ExecutionModeEnum %s can't %s!", 
executionModeEnum.getName(), action));
+    }
+    return null;
+  }
 }

Reply via email to