This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 89239e27b [Issue-2535] Refactor FlinkClusterService. (#2744)
89239e27b is described below
commit 89239e27b31be6c0696e931ebff956a3b05c4b25
Author: Roc Marshal <[email protected]>
AuthorDate: Sun May 14 22:41:04 2023 +0800
[Issue-2535] Refactor FlinkClusterService. (#2744)
* [Issue-2535] Refactor FlinkClusterService.
* Updated based on zhouli's review comments
---
.../streampark/common/enums/ClusterState.java | 4 +
.../console/core/entity/FlinkCluster.java | 8 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 306 ++++++++++-----------
3 files changed, 157 insertions(+), 161 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 6b0ea9c4c..9dbc3aa4b 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -49,4 +49,8 @@ public enum ClusterState implements Serializable {
public Integer getValue() {
return value;
}
+
+ public static boolean isStarted(ClusterState state) {
+ return STARTED.equals(state);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 7d20f0976..57672218b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -145,6 +145,11 @@ public class FlinkCluster implements Serializable {
return null;
}
+ /**
+ * Verify the cluster connection whether is valid.
+ *
+ * @return <code>false</code> if the connection of the cluster is invalid,
<code>true</code> else.
+ */
public boolean verifyClusterConnection() {
if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
if (address == null) {
@@ -167,7 +172,8 @@ public class FlinkCluster implements Serializable {
//
}
return false;
- } else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum()))
{
+ }
+ if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
try {
String restUrl = YarnUtils.getRMWebAppURL() + "/proxy/" +
this.clusterId + "/overview";
String result =
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index cfb9148ab..a1ed45c75 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -25,7 +25,6 @@ import
org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
-import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.mapper.FlinkClusterMapper;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
@@ -52,14 +51,19 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@Slf4j
@@ -98,36 +102,32 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
Boolean existsByClusterName =
this.existsByClusterName(cluster.getClusterName(), cluster.getId());
if (existsByClusterName) {
- result.setMsg("clusterName is already exists,please check!");
+ result.setMsg("ClusterName is already exists, please check!");
result.setStatus(1);
return result;
}
// 2) Check target-cluster is already exists
String clusterId = cluster.getClusterId();
- if (StringUtils.isNotEmpty(clusterId)) {
- Boolean existsByClusterId = this.existsByClusterId(clusterId,
cluster.getId());
- if (existsByClusterId) {
- result.setMsg("the clusterId " + clusterId + " is already
exists,please check!");
- result.setStatus(2);
- return result;
- }
+ if (StringUtils.isNotEmpty(clusterId) && this.existsByClusterId(clusterId,
cluster.getId())) {
+ result.setMsg("The clusterId " + clusterId + " is already exists,please
check!");
+ result.setStatus(2);
+ return result;
}
// 3) Check connection
- if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
- if (!cluster.verifyClusterConnection()) {
- result.setMsg("the remote cluster connection failed, please check!");
- result.setStatus(3);
- return result;
- }
- } else if
(ExecutionMode.YARN_SESSION.equals(cluster.getExecutionModeEnum())
- && cluster.getClusterId() != null) {
- if (!cluster.verifyClusterConnection()) {
- result.setMsg("the flink cluster connection failed, please check!");
- result.setStatus(4);
- return result;
- }
+ if (ExecutionMode.isRemoteMode(cluster.getExecutionModeEnum())
+ && !cluster.verifyClusterConnection()) {
+ result.setMsg("The remote cluster connection failed, please check!");
+ result.setStatus(3);
+ return result;
+ }
+ if (ExecutionMode.isYarnMode(cluster.getExecutionModeEnum())
+ && cluster.getClusterId() != null
+ && !cluster.verifyClusterConnection()) {
+ result.setMsg("The flink cluster connection failed, please check!");
+ result.setStatus(4);
+ return result;
}
return result;
@@ -140,7 +140,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ApiAlertException.throwIfFalse(
successful, String.format(ERROR_CLUSTER_QUEUE_HINT,
flinkCluster.getYarnQueue()));
flinkCluster.setCreateTime(new Date());
- if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+ if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
flinkCluster.setClusterState(ClusterState.STARTED.getValue());
} else {
flinkCluster.setClusterState(ClusterState.CREATED.getValue());
@@ -153,54 +153,21 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
public void start(FlinkCluster cluster) {
FlinkCluster flinkCluster = getById(cluster.getId());
try {
- ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
- KubernetesDeployParam kubernetesDeployParam = null;
- switch (executionModeEnum) {
- case YARN_SESSION:
- break;
- case KUBERNETES_NATIVE_SESSION:
- kubernetesDeployParam =
- new KubernetesDeployParam(
- flinkCluster.getClusterId(),
- flinkCluster.getK8sNamespace(),
- flinkCluster.getK8sConf(),
- flinkCluster.getServiceAccount(),
- flinkCluster.getFlinkImage(),
- flinkCluster.getK8sRestExposedTypeEnum());
- break;
- default:
- throw new ApiAlertException(
- "the ExecutionModeEnum " + executionModeEnum.getName() + "can't
start!");
- }
- FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
- DeployRequest deployRequest =
- new DeployRequest(
- flinkEnv.getFlinkVersion(),
- executionModeEnum,
- flinkCluster.getProperties(),
- flinkCluster.getClusterId(),
- kubernetesDeployParam);
- log.info("deploy cluster request " + deployRequest);
- Future<DeployResponse> future =
- executorService.submit(() -> FlinkClient.deploy(deployRequest));
- DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
- if (deployResponse != null) {
- if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
- String address =
- YarnUtils.getRMWebAppURL() + "/proxy/" +
deployResponse.clusterId() + "/";
- flinkCluster.setAddress(address);
- } else {
- flinkCluster.setAddress(deployResponse.address());
- }
- flinkCluster.setClusterId(deployResponse.clusterId());
- flinkCluster.setClusterState(ClusterState.STARTED.getValue());
- flinkCluster.setException(null);
- updateById(flinkCluster);
- FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
+ DeployResponse deployResponse = deployInternal(flinkCluster);
+ ApiAlertException.throwIfNull(
+ deployResponse,
+ "Deploy cluster failed, unknown reason,please check you params or
StreamPark error log");
+ if
(ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
+ String address = YarnUtils.getRMWebAppURL() + "/proxy/" +
deployResponse.clusterId() + "/";
+ flinkCluster.setAddress(address);
} else {
- throw new ApiAlertException(
- "deploy cluster failed, unknown reason,please check you params or
StreamPark error log");
+ flinkCluster.setAddress(deployResponse.address());
}
+ flinkCluster.setClusterId(deployResponse.clusterId());
+ flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+ flinkCluster.setException(null);
+ updateById(flinkCluster);
+ FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
@@ -216,30 +183,12 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
boolean success = validateQueueIfNeeded(flinkCluster, cluster);
ApiAlertException.throwIfFalse(
success, String.format(ERROR_CLUSTER_QUEUE_HINT,
cluster.getYarnQueue()));
- flinkCluster.setClusterName(cluster.getClusterName());
- flinkCluster.setDescription(cluster.getDescription());
- if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
- flinkCluster.setAddress(cluster.getAddress());
- } else {
- flinkCluster.setAddress(null);
- flinkCluster.setClusterId(cluster.getClusterId());
- flinkCluster.setVersionId(cluster.getVersionId());
- flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
- flinkCluster.setOptions(cluster.getOptions());
- flinkCluster.setResolveOrder(cluster.getResolveOrder());
- flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
- flinkCluster.setK8sConf(cluster.getK8sConf());
- flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
- flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
- flinkCluster.setServiceAccount(cluster.getServiceAccount());
- flinkCluster.setFlinkImage(cluster.getFlinkImage());
- flinkCluster.setYarnQueue(cluster.getYarnQueue());
- }
+ updateCluster(cluster, flinkCluster);
try {
updateById(flinkCluster);
} catch (Exception e) {
throw new ApiDetailException(
- "update cluster failed, Caused By: " +
ExceptionUtils.getStackTrace(e));
+ "Update cluster failed, Caused By: " +
ExceptionUtils.getStackTrace(e));
}
}
@@ -247,78 +196,31 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
public void shutdown(FlinkCluster cluster) {
FlinkCluster flinkCluster = this.getById(cluster.getId());
// 1) check mode
- ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
String clusterId = flinkCluster.getClusterId();
- KubernetesDeployParam kubernetesDeployParam = null;
- switch (executionModeEnum) {
- case YARN_SESSION:
- break;
- case KUBERNETES_NATIVE_SESSION:
- kubernetesDeployParam =
- new KubernetesDeployParam(
- flinkCluster.getClusterId(),
- flinkCluster.getK8sNamespace(),
- flinkCluster.getK8sConf(),
- flinkCluster.getServiceAccount(),
- flinkCluster.getFlinkImage(),
- flinkCluster.getK8sRestExposedTypeEnum());
- break;
- default:
- throw new ApiAlertException(
- "the ExecutionModeEnum " + executionModeEnum.getName() + "can't
shutdown!");
- }
- if (StringUtils.isBlank(clusterId)) {
- throw new ApiAlertException("the clusterId can not be empty!");
- }
+ ApiAlertException.throwIfTrue(
+ StringUtils.isBlank(clusterId), "The clusterId can not be empty!");
// 2) check cluster is active
- if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
- if
(ClusterState.STARTED.equals(ClusterState.of(flinkCluster.getClusterState()))) {
- if (!flinkCluster.verifyClusterConnection()) {
- flinkCluster.setAddress(null);
- flinkCluster.setClusterState(ClusterState.LOST.getValue());
- updateById(flinkCluster);
- throw new ApiAlertException("current cluster is not active, please
check");
- }
- } else {
- throw new ApiAlertException("current cluster is not active, please
check");
- }
- }
+ checkActiveIfNeeded(flinkCluster);
// 3) check job if running on cluster
boolean existsRunningJob =
applicationService.existsRunningJobByClusterId(flinkCluster.getId());
- if (existsRunningJob) {
- throw new ApiAlertException(
- "some app is running on this cluster, the cluster cannot be
shutdown");
- }
-
- // 4) shutdown
- FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
- ShutDownRequest stopRequest =
- new ShutDownRequest(
- flinkEnv.getFlinkVersion(),
- executionModeEnum,
- flinkCluster.getProperties(),
- clusterId,
- kubernetesDeployParam);
+ ApiAlertException.throwIfTrue(
+ existsRunningJob, "Some app is running on this cluster, the cluster
cannot be shutdown");
try {
- Future<ShutDownResponse> future =
- executorService.submit(() -> FlinkClient.shutdown(stopRequest));
- ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
- if (shutDownResponse != null) {
- flinkCluster.setAddress(null);
- flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
- updateById(flinkCluster);
- } else {
- throw new ApiAlertException("get shutdown response failed");
- }
+ // 4) shutdown
+ ShutDownResponse shutDownResponse = shutdownInternal(flinkCluster,
clusterId);
+ ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response
failed");
+ flinkCluster.setAddress(null);
+ flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+ updateById(flinkCluster);
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setException(e.toString());
updateById(flinkCluster);
throw new ApiDetailException(
- "shutdown cluster failed, Caused By: " +
ExceptionUtils.getStackTrace(e));
+ "Shutdown cluster failed, Caused By: " +
ExceptionUtils.getStackTrace(e));
}
}
@@ -355,21 +257,18 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
public void delete(FlinkCluster cluster) {
Long id = cluster.getId();
FlinkCluster flinkCluster = getById(id);
- if (flinkCluster == null) {
- throw new ApiAlertException("flink cluster not exist, please check.");
- }
+ ApiAlertException.throwIfNull(flinkCluster, "Flink cluster not exist,
please check.");
- if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())
- ||
ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(flinkCluster.getExecutionModeEnum()))
{
- if (ClusterState.STARTED.equals(flinkCluster.getClusterStateEnum())) {
- throw new ApiAlertException("flink cluster is running, cannot be
delete, please check.");
- }
+ if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())
+ ||
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode())) {
+ ApiAlertException.throwIfTrue(
+ ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+ "Flink cluster is running, cannot be delete, please check.");
}
- if (applicationService.existsJobByClusterId(id)) {
- throw new ApiAlertException(
- "some app on this cluster, the cluster cannot be delete, please
check.");
- }
+ ApiAlertException.throwIfTrue(
+ applicationService.existsJobByClusterId(id),
+ "Some app on this cluster, the cluster cannot be delete, please
check.");
removeById(id);
}
@@ -422,4 +321,91 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
return ExecutionMode.isYarnSessionMode(cluster.getExecutionModeEnum())
&& !yarnQueueService.isDefaultQueue(cluster.getYarnQueue());
}
+
+ private ShutDownResponse shutdownInternal(FlinkCluster flinkCluster, String
clusterId)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ ShutDownRequest stopRequest =
+ new ShutDownRequest(
+
flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+ flinkCluster.getExecutionModeEnum(),
+ flinkCluster.getProperties(),
+ clusterId,
+ getKubernetesDeployDesc(flinkCluster, "shutdown"));
+ Future<ShutDownResponse> future =
+ executorService.submit(() -> FlinkClient.shutdown(stopRequest));
+ return future.get(60, TimeUnit.SECONDS);
+ }
+
+ private DeployResponse deployInternal(FlinkCluster flinkCluster)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ DeployRequest deployRequest =
+ new DeployRequest(
+
flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+ flinkCluster.getExecutionModeEnum(),
+ flinkCluster.getProperties(),
+ flinkCluster.getClusterId(),
+ getKubernetesDeployDesc(flinkCluster, "start"));
+ log.info("Deploy cluster request " + deployRequest);
+ Future<DeployResponse> future = executorService.submit(() ->
FlinkClient.deploy(deployRequest));
+ return future.get(60, TimeUnit.SECONDS);
+ }
+
+ private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
+ if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
+ ApiAlertException.throwIfFalse(
+ ClusterState.isStarted(flinkCluster.getClusterStateEnum()),
+ "Current cluster is not active, please check!");
+ if (!flinkCluster.verifyClusterConnection()) {
+ flinkCluster.setAddress(null);
+ flinkCluster.setClusterState(ClusterState.LOST.getValue());
+ updateById(flinkCluster);
+ throw new ApiAlertException("Current cluster is not active, please
check!");
+ }
+ }
+ }
+
+ private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) {
+ flinkCluster.setClusterName(cluster.getClusterName());
+ flinkCluster.setDescription(cluster.getDescription());
+ if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
+ flinkCluster.setAddress(cluster.getAddress());
+ } else {
+ flinkCluster.setAddress(null);
+ flinkCluster.setClusterId(cluster.getClusterId());
+ flinkCluster.setVersionId(cluster.getVersionId());
+ flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
+ flinkCluster.setOptions(cluster.getOptions());
+ flinkCluster.setResolveOrder(cluster.getResolveOrder());
+ flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
+ flinkCluster.setK8sConf(cluster.getK8sConf());
+ flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
+ flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
+ flinkCluster.setServiceAccount(cluster.getServiceAccount());
+ flinkCluster.setFlinkImage(cluster.getFlinkImage());
+ flinkCluster.setYarnQueue(cluster.getYarnQueue());
+ }
+ }
+
+ @Nullable
+ private KubernetesDeployParam getKubernetesDeployDesc(
+ @Nonnull FlinkCluster flinkCluster, String action) {
+ ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+ switch (executionModeEnum) {
+ case YARN_SESSION:
+ break;
+ case KUBERNETES_NATIVE_SESSION:
+ return new KubernetesDeployParam(
+ flinkCluster.getClusterId(),
+ flinkCluster.getK8sNamespace(),
+ flinkCluster.getK8sConf(),
+ flinkCluster.getServiceAccount(),
+ flinkCluster.getFlinkImage(),
+ flinkCluster.getK8sRestExposedTypeEnum());
+ default:
+ throw new ApiAlertException(
+ String.format(
+ "The ExecutionModeEnum %s can't %s!",
executionModeEnum.getName(), action));
+ }
+ return null;
+ }
}