This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch k8s-shutdown in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 9a5b50833a69322373baa7b5b1ab278519aea756 Author: benjobs <[email protected]> AuthorDate: Mon Jan 8 23:05:54 2024 +0800 [Imprpve] k8s cluster shutdown bug fixed. --- .../core/service/impl/FlinkClusterServiceImpl.java | 89 ++++++++-------------- .../streampark/flink/client/FlinkClient.scala | 4 +- .../flink/client/bean/DeployRequest.scala | 47 +++++++++--- .../flink/client/bean/ShutDownRequest.scala | 32 -------- .../flink/client/FlinkClientHandler.scala | 2 +- .../impl/KubernetesNativeSessionClient.scala | 59 ++++++-------- .../flink/client/impl/YarnSessionClient.scala | 2 +- 7 files changed, 97 insertions(+), 138 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index fde4c00e6..f73a58290 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -35,8 +35,7 @@ import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.flink.client.FlinkClient; import org.apache.streampark.flink.client.bean.DeployRequest; import org.apache.streampark.flink.client.bean.DeployResponse; -import org.apache.streampark.flink.client.bean.KubernetesDeployParam; -import org.apache.streampark.flink.client.bean.ShutDownRequest; +import org.apache.streampark.flink.client.bean.KubernetesDeployRequest; import org.apache.streampark.flink.client.bean.ShutDownResponse; import org.apache.commons.lang3.StringUtils; @@ -153,33 +152,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli FlinkCluster flinkCluster = getById(cluster.getId()); try { ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum(); - KubernetesDeployParam kubernetesDeployParam = null; - switch (executionModeEnum) { - case YARN_SESSION: - break; - case KUBERNETES_NATIVE_SESSION: - kubernetesDeployParam = - new KubernetesDeployParam( - flinkCluster.getClusterId(), - flinkCluster.getK8sNamespace(), - flinkCluster.getK8sConf(), - flinkCluster.getServiceAccount(), - flinkCluster.getFlinkImage(), - flinkCluster.getK8sRestExposedTypeEnum()); - break; - default: - throw new ApiAlertException( - "the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!"); - } - FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId()); - DeployRequest deployRequest = - new DeployRequest( - flinkEnv.getFlinkVersion(), - executionModeEnum, - flinkCluster.getProperties(), - flinkCluster.getClusterId(), - kubernetesDeployParam); - log.info("deploy cluster request " + deployRequest); + DeployRequest deployRequest = getDeployRequest(flinkCluster); + log.info("deploy cluster request: " + deployRequest); Future<DeployResponse> future = executorService.submit(() -> FlinkClient.deploy(deployRequest)); DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS); @@ -208,6 +182,33 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli } } + private DeployRequest getDeployRequest(FlinkCluster flinkCluster) { + ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum(); + FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId()); + switch (executionModeEnum) { + case YARN_SESSION: + return DeployRequest.apply( + flinkEnv.getFlinkVersion(), + executionModeEnum, + flinkCluster.getProperties(), + flinkCluster.getClusterId()); + case KUBERNETES_NATIVE_SESSION: + return KubernetesDeployRequest.apply( + flinkEnv.getFlinkVersion(), + executionModeEnum, + flinkCluster.getProperties(), + flinkCluster.getClusterId(), + flinkCluster.getK8sNamespace(), + flinkCluster.getK8sConf(), + flinkCluster.getServiceAccount(), + flinkCluster.getFlinkImage(), + flinkCluster.getK8sRestExposedTypeEnum()); + default: + throw new ApiAlertException( + "the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!"); + } + } + @Override public void update(FlinkCluster cluster) { FlinkCluster flinkCluster = getById(cluster.getId()); @@ -247,24 +248,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli // 1) check mode ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum(); String clusterId = flinkCluster.getClusterId(); - KubernetesDeployParam kubernetesDeployParam = null; - switch (executionModeEnum) { - case YARN_SESSION: - break; - case KUBERNETES_NATIVE_SESSION: - kubernetesDeployParam = - new KubernetesDeployParam( - flinkCluster.getClusterId(), - flinkCluster.getK8sNamespace(), - flinkCluster.getK8sConf(), - flinkCluster.getServiceAccount(), - flinkCluster.getFlinkImage(), - flinkCluster.getK8sRestExposedTypeEnum()); - break; - default: - throw new ApiAlertException( - "the ExecutionModeEnum " + executionModeEnum.getName() + "can't shutdown!"); - } if (StringUtils.isBlank(clusterId)) { throw new ApiAlertException("the clusterId can not be empty!"); } @@ -291,18 +274,10 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli } // 4) shutdown - FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId()); - ShutDownRequest stopRequest = - new ShutDownRequest( - flinkEnv.getFlinkVersion(), - executionModeEnum, - flinkCluster.getProperties(), - clusterId, - kubernetesDeployParam); - + DeployRequest deployRequest = getDeployRequest(cluster); try { Future<ShutDownResponse> future = - executorService.submit(() -> FlinkClient.shutdown(stopRequest)); + executorService.submit(() -> FlinkClient.shutdown(deployRequest)); ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS); if (shutDownResponse != null) { flinkCluster.setAddress(null); diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala index c2ea9ba48..19ee9e744 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala @@ -57,8 +57,8 @@ object FlinkClient extends Logger { proxy[DeployResponse](deployRequest, deployRequest.flinkVersion, DEPLOY_REQUEST) } - def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = { - proxy[ShutDownResponse](shutDownRequest, shutDownRequest.flinkVersion, SHUTDOWN_REQUEST) + def shutdown(deployRequest: DeployRequest): ShutDownResponse = { + proxy[ShutDownResponse](deployRequest, deployRequest.flinkVersion, SHUTDOWN_REQUEST) } def triggerSavepoint(savepointRequest: TriggerSavepointRequest): SavepointResponse = { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala index db7bbcd3d..81b36a17b 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala @@ -24,8 +24,6 @@ import org.apache.streampark.flink.util.FlinkUtils import org.apache.commons.io.FileUtils import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions -import javax.annotation.Nullable - import java.io.File import java.util.{Map => JavaMap} @@ -33,8 +31,7 @@ case class DeployRequest( flinkVersion: FlinkVersion, executionMode: ExecutionMode, properties: JavaMap[String, Any], - clusterId: String, - @Nullable k8sDeployParam: KubernetesDeployParam) { + clusterId: String) { private[client] lazy val hdfsWorkspace = { @@ -63,10 +60,38 @@ case class DeployRequest( } } -case class KubernetesDeployParam( - clusterId: String, - kubernetesNamespace: String = KubernetesConfigOptions.NAMESPACE.defaultValue(), - kubeConf: String = "~/.kube/config", - serviceAccount: String = KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(), - flinkImage: String = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(), - @Nullable flinkRestExposedType: FlinkK8sRestExposedType = FlinkK8sRestExposedType.CLUSTER_IP) +class KubernetesDeployRequest( + override val flinkVersion: FlinkVersion, + override val executionMode: ExecutionMode, + override val properties: JavaMap[String, Any], + override val clusterId: String, + val kubernetesNamespace: String = KubernetesConfigOptions.NAMESPACE.defaultValue(), + val kubeConf: String = "~/.kube/config", + val serviceAccount: String = KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(), + val flinkImage: String = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(), + val flinkRestExposedType: FlinkK8sRestExposedType = FlinkK8sRestExposedType.CLUSTER_IP) + extends DeployRequest(flinkVersion, executionMode, properties, clusterId) + +object KubernetesDeployRequest { + def apply( + flinkVersion: FlinkVersion, + executionMode: ExecutionMode, + properties: JavaMap[String, Any], + clusterId: String, + kubernetesNamespace: String, + kubeConf: String, + serviceAccount: String, + flinkImage: String, + flinkRestExposedType: FlinkK8sRestExposedType): KubernetesDeployRequest = { + new KubernetesDeployRequest( + flinkVersion, + executionMode, + properties, + clusterId, + kubernetesNamespace, + kubeConf, + serviceAccount, + flinkImage, + flinkRestExposedType) + } +} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala deleted file mode 100644 index a6b75bbdf..000000000 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.flink.client.bean - -import org.apache.streampark.common.conf.FlinkVersion -import org.apache.streampark.common.enums.ExecutionMode - -import javax.annotation.Nullable - -import java.util.{Map => JavaMap} - -case class ShutDownRequest( - flinkVersion: FlinkVersion, - executionMode: ExecutionMode, - @Nullable properties: JavaMap[String, Any], - clusterId: String, - @Nullable kubernetesDeployParam: KubernetesDeployParam) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala index e85ceb6ee..c6fa402e9 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala @@ -70,7 +70,7 @@ object FlinkClientHandler { } } - def shutdown(request: ShutDownRequest): ShutDownResponse = { + def shutdown(request: DeployRequest): ShutDownResponse = { request.executionMode match { case YARN_SESSION => YarnSessionClient.shutdown(request) case KUBERNETES_NATIVE_SESSION => KubernetesNativeSessionClient.shutdown(request) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index 621b7ac8a..3f2e99bde 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -25,6 +25,7 @@ import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper import org.apache.streampark.flink.core.FlinkKubernetesClient import org.apache.streampark.flink.kubernetes.KubernetesRetriever import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode +import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper import org.apache.streampark.flink.kubernetes.model.ClusterKey import io.fabric8.kubernetes.api.model.{Config => _} @@ -134,7 +135,8 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo super.doCancel(cancelRequest, flinkConfig) } - def deploy(deployRequest: DeployRequest): DeployResponse = { + def deploy(deployReq: DeployRequest): DeployResponse = { + val deployRequest = deployReq.asInstanceOf[KubernetesDeployRequest] logInfo( s""" |--------------------------------------- kubernetes session start --------------------------------------- @@ -142,10 +144,10 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo | flinkVersion : ${deployRequest.flinkVersion.version} | execMode : ${deployRequest.executionMode.name()} | clusterId : ${deployRequest.clusterId} - | namespace : ${deployRequest.k8sDeployParam.kubernetesNamespace} - | exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType} - | serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount} - | flinkImage : ${deployRequest.k8sDeployParam.flinkImage} + | namespace : ${deployRequest.kubernetesNamespace} + | exposedType : ${deployRequest.flinkRestExposedType} + | serviceAccount : ${deployRequest.serviceAccount} + | flinkImage : ${deployRequest.flinkImage} | properties : ${deployRequest.properties.mkString(" ")} |------------------------------------------------------------------------------------------- |""".stripMargin) @@ -157,20 +159,16 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo extractConfiguration(deployRequest.flinkVersion.flinkHome, deployRequest.properties) flinkConfig .safeSet(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName) - .safeSet( - KubernetesConfigOptions.NAMESPACE, - deployRequest.k8sDeployParam.kubernetesNamespace) - .safeSet( - KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, - deployRequest.k8sDeployParam.serviceAccount) + .safeSet(KubernetesConfigOptions.NAMESPACE, deployRequest.kubernetesNamespace) + .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, deployRequest.serviceAccount) .safeSet( KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, - ServiceExposedType.valueOf(deployRequest.k8sDeployParam.flinkRestExposedType.getName)) + ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName)) .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId) - .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, deployRequest.k8sDeployParam.flinkImage) + .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, deployRequest.flinkImage) .safeSet( KubernetesConfigOptions.KUBE_CONFIG_FILE, - getDefaultKubernetesConf(deployRequest.k8sDeployParam.kubeConf)) + getDefaultKubernetesConf(deployRequest.kubeConf)) .safeSet( DeploymentOptionsInternal.CONF_DIR, s"${deployRequest.flinkVersion.flinkHome}/conf") @@ -205,7 +203,8 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo } } - def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = { + def shutdown(deployRequest: DeployRequest): ShutDownResponse = { + val shutDownRequest = deployRequest.asInstanceOf[KubernetesDeployRequest] var kubeClient: FlinkKubeClient = null try { val flinkConfig = getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome) @@ -217,32 +216,24 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo }) flinkConfig .safeSet(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName) - .safeSet( - KubernetesConfigOptions.NAMESPACE, - shutDownRequest.kubernetesDeployParam.kubernetesNamespace) - .safeSet( - KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, - shutDownRequest.kubernetesDeployParam.serviceAccount) + .safeSet(KubernetesConfigOptions.NAMESPACE, shutDownRequest.kubernetesNamespace) + .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, shutDownRequest.serviceAccount) .safeSet( KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, - ServiceExposedType.valueOf( - shutDownRequest.kubernetesDeployParam.flinkRestExposedType.getName)) + ServiceExposedType.valueOf(shutDownRequest.flinkRestExposedType.getName)) .safeSet(KubernetesConfigOptions.CLUSTER_ID, shutDownRequest.clusterId) - .safeSet( - KubernetesConfigOptions.CONTAINER_IMAGE, - shutDownRequest.kubernetesDeployParam.flinkImage) + .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, shutDownRequest.flinkImage) .safeSet( KubernetesConfigOptions.KUBE_CONFIG_FILE, - getDefaultKubernetesConf(shutDownRequest.kubernetesDeployParam.kubeConf)) + getDefaultKubernetesConf(shutDownRequest.kubeConf)) kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client") - val kubeClientWrapper = new FlinkKubernetesClient(kubeClient) - - if ( - shutDownRequest.clusterId != null && kubeClientWrapper - .getService(shutDownRequest.clusterId) - .isPresent - ) { + val flinkKubeClient = new FlinkKubernetesClient(kubeClient) + val k8sService = flinkKubeClient.getService(shutDownRequest.clusterId) + if (k8sService.isPresent) { kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId) + KubernetesDeploymentHelper.delete( + shutDownRequest.kubernetesNamespace, + shutDownRequest.clusterId) ShutDownResponse() } else { null diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index d2a98b740..91eb1d92d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -245,7 +245,7 @@ object YarnSessionClient extends YarnClientTrait { } } - def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = { + def shutdown(shutDownRequest: DeployRequest): ShutDownResponse = { var clusterDescriptor: YarnClusterDescriptor = null var client: ClusterClient[ApplicationId] = null try {
