This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 3cb7c84ae [Feature] flink cluster operation logic changes (#3426)
3cb7c84ae is described below
commit 3cb7c84ae2907342d54f048e634a29f4c08daf27
Author: zhengke zhou <[email protected]>
AuthorDate: Wed Dec 27 18:34:20 2023 +0800
[Feature] flink cluster operation logic changes (#3426)
* [Feature] application start logic changes
* [Feature] fix update method switch logic
* [Feature] improve application start check conditional judgment
* [Feature] add list available flink cluster router method
---
.../core/controller/FlinkClusterController.java | 7 +++++++
.../console/core/service/FlinkClusterService.java | 2 ++
.../impl/ApplicationActionServiceImpl.java | 24 ++++++++++++++++++++++
.../impl/ApplicationManageServiceImpl.java | 22 ++++++++++++++++++++
.../core/service/impl/FlinkClusterServiceImpl.java | 7 +++++++
5 files changed, 62 insertions(+)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index d84eb74b1..2af28b1fb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -46,6 +46,13 @@ public class FlinkClusterController {
@Autowired private FlinkClusterService flinkClusterService;
+ @Operation(summary = "List flink clusters that are eligible")
+ @PostMapping("availableList")
+ public RestResponse listAvailableCluster() {
+ List<FlinkCluster> flinkClusters =
flinkClusterService.listAvailableCluster();
+ return RestResponse.success(flinkClusters);
+ }
+
@Operation(summary = "List flink clusters")
@PostMapping("list")
public RestResponse list() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index da3947c27..ff043e825 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -29,6 +29,8 @@ import java.util.List;
public interface FlinkClusterService extends IService<FlinkCluster> {
+ List<FlinkCluster> listAvailableCluster();
+
ResponseResult check(FlinkCluster flinkCluster);
Boolean create(FlinkCluster flinkCluster);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 85188dc43..17de0e116 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.FlinkRestoreMode;
@@ -67,6 +68,7 @@ import
org.apache.streampark.console.core.service.application.ApplicationInfoSer
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -171,6 +173,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
+ @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
new ConcurrentHashMap<>();
@@ -390,6 +394,11 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be
started repeatedly.");
+ if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())
+ ||
FlinkExecutionMode.isSessionMode(application.getFlinkExecutionMode())) {
+ checkBeforeStart(application);
+ }
+
if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
ApiAlertException.throwIfTrue(
@@ -813,4 +822,19 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
return null;
}
+
+ /* check flink cluster before job start job */
+ private void checkBeforeStart(Application application) {
+ FlinkEnv flinkEnv = flinkEnvService.getByAppId(application.getId());
+ ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink
version");
+
+ ApiAlertException.throwIfFalse(
+ flinkClusterService.existsByFlinkEnvId(flinkEnv.getId()),
+ "[StreamPark] The flink cluster don't exist, please check it");
+
+ FlinkCluster flinkCluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ ApiAlertException.throwIfFalse(
+ flinkClusterWatcher.getClusterState(flinkCluster) ==
ClusterState.RUNNING,
+ "[StreamPark] The flink cluster not running, please start it");
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 247ff807a..2a4dd9d2e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -19,6 +19,7 @@ package
org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
@@ -31,6 +32,7 @@ import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
+import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.enums.CandidateTypeEnum;
@@ -45,6 +47,7 @@ import
org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.EffectiveService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.ResourceService;
@@ -54,6 +57,7 @@ import
org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
import org.apache.streampark.console.core.watcher.FlinkK8sObserverStub;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
@@ -131,6 +135,10 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
@Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
+ @Autowired private FlinkClusterWatcher flinkClusterWatcher;
+
+ @Autowired private FlinkClusterService flinkClusterService;
+
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
@@ -463,6 +471,20 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
public boolean update(Application appParam) {
Application application = getById(appParam.getId());
+ /* If the original mode is remote, k8s-session, yarn-session, check
cluster status */
+ FlinkExecutionMode flinkExecutionMode =
application.getFlinkExecutionMode();
+ switch (flinkExecutionMode) {
+ case REMOTE:
+ case YARN_SESSION:
+ case KUBERNETES_NATIVE_SESSION:
+ FlinkCluster flinkCluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ ApiAlertException.throwIfFalse(
+ flinkClusterWatcher.getClusterState(flinkCluster) ==
ClusterState.RUNNING,
+ "[StreamPark] update failed, because bind flink cluster not
running");
+ break;
+ default:
+ }
+
boolean success = validateQueueIfNeeded(application, appParam);
ApiAlertException.throwIfFalse(
success,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 149dde4b5..b85313126 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -100,6 +100,13 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Autowired private FlinkK8sObserverStub flinkK8sObserver;
+ @Override
+ public List<FlinkCluster> listAvailableCluster() {
+ LambdaQueryWrapper<FlinkCluster> lambdaQueryWrapper = new
LambdaQueryWrapper<>();
+ lambdaQueryWrapper.eq(FlinkCluster::getClusterState, ClusterState.RUNNING);
+ return this.list(lambdaQueryWrapper);
+ }
+
@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();