This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3cb7c84ae [Feature] flink cluster operation logic changes (#3426)
3cb7c84ae is described below

commit 3cb7c84ae2907342d54f048e634a29f4c08daf27
Author: zhengke zhou <[email protected]>
AuthorDate: Wed Dec 27 18:34:20 2023 +0800

    [Feature] flink cluster operation logic changes (#3426)
    
    * [Feature] application start logic changes
    
    * [Feature] fix update method switch logic
    
    * [Feature] improve application start check conditional judgment
    
    * [Feature] add list available flink cluster router method
---
 .../core/controller/FlinkClusterController.java    |  7 +++++++
 .../console/core/service/FlinkClusterService.java  |  2 ++
 .../impl/ApplicationActionServiceImpl.java         | 24 ++++++++++++++++++++++
 .../impl/ApplicationManageServiceImpl.java         | 22 ++++++++++++++++++++
 .../core/service/impl/FlinkClusterServiceImpl.java |  7 +++++++
 5 files changed, 62 insertions(+)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index d84eb74b1..2af28b1fb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -46,6 +46,13 @@ public class FlinkClusterController {
 
   @Autowired private FlinkClusterService flinkClusterService;
 
+  @Operation(summary = "List flink clusters that are eligible")
+  @PostMapping("availableList")
+  public RestResponse listAvailableCluster() {
+    List<FlinkCluster> flinkClusters = 
flinkClusterService.listAvailableCluster();
+    return RestResponse.success(flinkClusters);
+  }
+
   @Operation(summary = "List flink clusters")
   @PostMapping("list")
   public RestResponse list() {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index da3947c27..ff043e825 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -29,6 +29,8 @@ import java.util.List;
 
 public interface FlinkClusterService extends IService<FlinkCluster> {
 
+  List<FlinkCluster> listAvailableCluster();
+
   ResponseResult check(FlinkCluster flinkCluster);
 
   Boolean create(FlinkCluster flinkCluster);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 85188dc43..17de0e116 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.enums.FlinkRestoreMode;
@@ -67,6 +68,7 @@ import 
org.apache.streampark.console.core.service.application.ApplicationInfoSer
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
 import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -171,6 +173,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
   @Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
 
+  @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
   private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
       new ConcurrentHashMap<>();
 
@@ -390,6 +394,11 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     ApiAlertException.throwIfTrue(
         !application.isCanBeStart(), "[StreamPark] The application cannot be 
started repeatedly.");
 
+    if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())
+        || 
FlinkExecutionMode.isSessionMode(application.getFlinkExecutionMode())) {
+      checkBeforeStart(application);
+    }
+
     if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
 
       ApiAlertException.throwIfTrue(
@@ -813,4 +822,19 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     }
     return null;
   }
+
+  /* check flink cluster before job start job */
+  private void checkBeforeStart(Application application) {
+    FlinkEnv flinkEnv = flinkEnvService.getByAppId(application.getId());
+    ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink 
version");
+
+    ApiAlertException.throwIfFalse(
+        flinkClusterService.existsByFlinkEnvId(flinkEnv.getId()),
+        "[StreamPark] The flink cluster don't exist, please check it");
+
+    FlinkCluster flinkCluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+    ApiAlertException.throwIfFalse(
+        flinkClusterWatcher.getClusterState(flinkCluster) == 
ClusterState.RUNNING,
+        "[StreamPark] The flink cluster not running, please start it");
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 247ff807a..2a4dd9d2e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -19,6 +19,7 @@ package 
org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
@@ -31,6 +32,7 @@ import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
+import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Resource;
 import org.apache.streampark.console.core.enums.CandidateTypeEnum;
@@ -45,6 +47,7 @@ import 
org.apache.streampark.console.core.service.ApplicationConfigService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.CommonService;
 import org.apache.streampark.console.core.service.EffectiveService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.ProjectService;
 import org.apache.streampark.console.core.service.ResourceService;
@@ -54,6 +57,7 @@ import 
org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
 import org.apache.streampark.console.core.watcher.FlinkK8sObserverStub;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
@@ -131,6 +135,10 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
   @Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
 
+  @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
   @PostConstruct
   public void resetOptionState() {
     this.baseMapper.resetOptionState();
@@ -463,6 +471,20 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
   public boolean update(Application appParam) {
     Application application = getById(appParam.getId());
 
+    /* If the original mode is remote, k8s-session, yarn-session, check 
cluster status */
+    FlinkExecutionMode flinkExecutionMode = 
application.getFlinkExecutionMode();
+    switch (flinkExecutionMode) {
+      case REMOTE:
+      case YARN_SESSION:
+      case KUBERNETES_NATIVE_SESSION:
+        FlinkCluster flinkCluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+        ApiAlertException.throwIfFalse(
+            flinkClusterWatcher.getClusterState(flinkCluster) == 
ClusterState.RUNNING,
+            "[StreamPark] update failed, because bind flink cluster not 
running");
+        break;
+      default:
+    }
+
     boolean success = validateQueueIfNeeded(application, appParam);
     ApiAlertException.throwIfFalse(
         success,
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 149dde4b5..b85313126 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -100,6 +100,13 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Autowired private FlinkK8sObserverStub flinkK8sObserver;
 
+  @Override
+  public List<FlinkCluster> listAvailableCluster() {
+    LambdaQueryWrapper<FlinkCluster> lambdaQueryWrapper = new 
LambdaQueryWrapper<>();
+    lambdaQueryWrapper.eq(FlinkCluster::getClusterState, ClusterState.RUNNING);
+    return this.list(lambdaQueryWrapper);
+  }
+
   @Override
   public ResponseResult check(FlinkCluster cluster) {
     ResponseResult result = new ResponseResult();

Reply via email to