This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch k8s_deploy_shutdown in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 7bca3dcf8b6583e3c294775e4692ef0e33c9316e Author: benjobs <[email protected]> AuthorDate: Thu Jan 11 19:37:25 2024 +0800 [Improve] k8s application|session start|stop improvement --- .../impl/KubernetesNativeApplicationClient.scala | 13 ++---- .../impl/KubernetesNativeSessionClient.scala | 38 ++++++---------- .../flink/client/impl/YarnApplicationClient.scala | 8 +++- .../flink/client/impl/YarnPerJobClient.scala | 14 +++--- .../flink/client/impl/YarnSessionClient.scala | 50 +++------------------- .../flink/client/trait/FlinkClientTrait.scala | 6 +-- .../client/trait/KubernetesNativeClientTrait.scala | 15 ++++--- .../flink/client/trait/YarnClientTrait.scala | 16 ++++--- .../helper/KubernetesDeploymentHelper.scala | 25 ++++------- 9 files changed, 64 insertions(+), 121 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala index 0d8f27a42..c608687f2 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala @@ -21,7 +21,6 @@ import org.apache.streampark.common.enums.ExecutionMode import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait import org.apache.streampark.flink.client.bean._ -import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse import com.google.common.collect.Lists @@ -89,15 +88,9 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { } } - override def doCancel( - cancelRequest: CancelRequest, - flinkConfig: Configuration): CancelResponse = { - flinkConfig.safeSet( - DeploymentOptions.TARGET, - ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName) - val resp = super.doCancel(cancelRequest, flinkConfig) - KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace, cancelRequest.clusterId) - resp + override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = { + flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName) + super.doCancel(cancelRequest, flinkConf) } override def doTriggerSavepoint( diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index 0cffc30ba..dce9a7865 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -25,7 +25,6 @@ import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper import org.apache.streampark.flink.core.FlinkKubernetesClient import org.apache.streampark.flink.kubernetes.KubernetesRetriever import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode -import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper import org.apache.streampark.flink.kubernetes.model.ClusterKey import io.fabric8.kubernetes.api.model.{Config => _} @@ -128,11 +127,9 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo } } - override def doCancel( - cancelRequest: CancelRequest, - flinkConfig: Configuration): CancelResponse = { - flinkConfig.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_SESSION.getName) - super.doCancel(cancelRequest, flinkConfig) + override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = { + flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_SESSION.getName) + super.doCancel(cancelRequest, flinkConf) } @throws[Exception] @@ -206,30 +203,19 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo |""".stripMargin) val flinkConfig = this.getFlinkK8sConfig(shutDownRequest) - val kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client") + val clusterDescriptor = getK8sClusterDescriptor(flinkConfig) + val client = clusterDescriptor + .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)) + .getClusterClient try { - val kubeClientWrapper = new FlinkKubernetesClient(kubeClient) - val kubeService = kubeClientWrapper.getService(deployRequest.clusterId) - if (kubeService.isPresent) { - kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId) - } else { - val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig) - val clusterDescriptor = kubernetesClusterDescriptor._1 - val client = clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient - if (client != null) { - client.shutDownCluster() - } - } - KubernetesDeploymentHelper.delete( - shutDownRequest.kubernetesNamespace, - shutDownRequest.clusterId) + client.shutDownCluster() ShutDownResponse() } catch { case e: Exception => logError(s"shutdown flink session fail in ${shutDownRequest.executionMode} mode") throw e } finally { - Utils.close(kubeClient) + Utils.close(client) } } @@ -251,8 +237,8 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo override def doTriggerSavepoint( request: TriggerSavepointRequest, - flinkConfig: Configuration): SavepointResponse = { - flinkConfig.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_SESSION.getName) - super.doTriggerSavepoint(request, flinkConfig) + flinkConf: Configuration): SavepointResponse = { + flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_SESSION.getName) + super.doTriggerSavepoint(request, flinkConf) } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index 1f4fb0ed3..8601c70a3 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -30,7 +30,7 @@ import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration._ import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils} import org.apache.flink.runtime.util.HadoopUtils -import org.apache.flink.yarn.configuration.YarnConfigOptions +import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.ApplicationId @@ -135,4 +135,10 @@ object YarnApplicationClient extends YarnClientTrait { }) } + override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = { + flinkConf + .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName) + super.doCancel(cancelRequest, flinkConf) + } + } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala index 244737d08..6ccf7001f 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala @@ -25,7 +25,7 @@ import org.apache.streampark.flink.util.FlinkUtils import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader import org.apache.flink.client.program.{ClusterClient, PackagedProgram} import org.apache.flink.configuration.{Configuration, DeploymentOptions} -import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor} +import org.apache.flink.yarn.YarnClusterDescriptor import org.apache.flink.yarn.configuration.YarnDeploymentTarget import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint import org.apache.hadoop.fs.{Path => HadoopPath} @@ -120,14 +120,10 @@ object YarnPerJobClient extends YarnClientTrait { } } - override def doCancel( - cancelRequest: CancelRequest, - flinkConfig: Configuration): CancelResponse = { - val response = super.doCancel(cancelRequest, flinkConfig) - val clusterClientFactory = new YarnClusterClientFactory - val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConfig) - clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId)) - response + override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = { + flinkConf + .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName) + super.doCancel(cancelRequest, flinkConf) } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index 91eb1d92d..0032e8eca 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -135,58 +135,20 @@ object YarnSessionClient extends YarnClientTrait { } } - private[this] def executeClientAction[O, R <: SavepointRequestTrait]( - request: R, - flinkConfig: Configuration, - actFunc: (JobID, ClusterClient[_]) => O): O = { - flinkConfig - .safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId) - .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName) - logInfo(s""" - |------------------------------------------------------------------ - |Effective submit configuration: $flinkConfig - |------------------------------------------------------------------ - |""".stripMargin) - - var clusterDescriptor: YarnClusterDescriptor = null - var client: ClusterClient[ApplicationId] = null - try { - val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig) - clusterDescriptor = yarnClusterDescriptor._2 - client = clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient - actFunc(JobID.fromHexString(request.jobId), client) - } catch { - case e: Exception => - logError(s"${request.getClass.getSimpleName} for flink yarn session job fail") - e.printStackTrace() - throw e - } finally { - Utils.close(client, clusterDescriptor) - } - } - override def doCancel( cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = { - executeClientAction( - cancelRequest, - flinkConfig, - (jobID, clusterClient) => { - val actionResult = super.cancelJob(cancelRequest, jobID, clusterClient) - CancelResponse(actionResult) - }) + flinkConfig + .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName) + super.doCancel(cancelRequest, flinkConfig) } override def doTriggerSavepoint( request: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = { - executeClientAction( - request, - flinkConfig, - (jobID, clusterClient) => { - val actionResult = super.triggerSavepoint(request, jobID, clusterClient) - SavepointResponse(actionResult) - }) + flinkConfig + .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName) + super.doTriggerSavepoint(request, flinkConfig) } def deploy(deployRequest: DeployRequest): DeployResponse = { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 0ef862883..5f536a3e9 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -470,10 +470,10 @@ trait FlinkClientTrait extends Logger { val savePointDir: String = tryGetSavepointPathIfNeed(cancelRequest) val clientWrapper = new FlinkClusterClient(client) + val withSavepoint = Try(cancelRequest.withSavepoint).getOrElse(false) + val withDrain = Try(cancelRequest.withDrain).getOrElse(false) - ( - Try(cancelRequest.withSavepoint).getOrElse(false), - Try(cancelRequest.withDrain).getOrElse(false)) match { + (withSavepoint, withDrain) match { case (false, false) => client.cancel(jobID).get() null diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala index 2dd756b8f..bf3b1767b 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala @@ -85,10 +85,14 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait { executeClientAction( cancelRequest, flinkConfig, - (jobId, clusterClient) => { - val actionResult = super.cancelJob(cancelRequest, jobId, clusterClient) - CancelResponse(actionResult) - }) + (jobId, client) => { + val resp = super.cancelJob(cancelRequest, jobId, client) + if (cancelRequest.executionMode == ExecutionMode.KUBERNETES_NATIVE_APPLICATION) { + client.shutDownCluster() + } + CancelResponse(resp) + } + ) } private[this] def executeClientAction[O, R <: SavepointRequestTrait]( @@ -157,8 +161,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait { def getK8sClusterDescriptor(flinkConfig: Configuration): KubernetesClusterDescriptor = { val clientFactory = new KubernetesClusterClientFactory() - val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig) - clusterDescriptor + clientFactory.createClusterDescriptor(flinkConfig) } protected def flinkConfIdentifierInfo(@Nonnull conf: Configuration): String = diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala index 32991b109..445f45e7f 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala @@ -17,6 +17,7 @@ package org.apache.streampark.flink.client.`trait` +import org.apache.streampark.common.enums.ExecutionMode import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.client.bean._ @@ -77,8 +78,8 @@ trait YarnClientTrait extends FlinkClientTrait { executeClientAction( request, flinkConf, - (jid, client) => { - SavepointResponse(super.triggerSavepoint(request, jid, client)) + (jobID, client) => { + SavepointResponse(super.triggerSavepoint(request, jobID, client)) }) } @@ -86,9 +87,14 @@ trait YarnClientTrait extends FlinkClientTrait { executeClientAction( cancelRequest, flinkConf, - (jid, client) => { - CancelResponse(super.cancelJob(cancelRequest, jid, client)) - }) + (jobId, client) => { + val resp = super.cancelJob(cancelRequest, jobId, client) + if (cancelRequest.executionMode == ExecutionMode.YARN_PER_JOB) { + client.shutDownCluster() + } + CancelResponse(resp) + } + ) } private lazy val deployInternalMethod: Method = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala index 590cd806d..e28922002 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala @@ -68,30 +68,21 @@ object KubernetesDeploymentHelper extends Logger { }.getOrElse(true) } - private[this] def deleteDeployment(nameSpace: String, deploymentName: String): Boolean = { + private[this] def deleteDeployment(nameSpace: String, deploymentName: String): Unit = { using(KubernetesRetriever.newK8sClient()) { client => - Try { - val r = client.apps.deployments - .inNamespace(nameSpace) - .withName(deploymentName) - .delete - Boolean.unbox(r) - }.getOrElse(false) + val map = client.apps.deployments.inNamespace(nameSpace) + map.withLabel("app", deploymentName).delete + map.withName(deploymentName).delete } } - private[this] def deleteConfigMap(nameSpace: String, deploymentName: String): Boolean = { + private[this] def deleteConfigMap(nameSpace: String, deploymentName: String): Unit = { using(KubernetesRetriever.newK8sClient()) { client => - Try { - val r = client - .configMaps() - .inNamespace(nameSpace) - .withLabel("app", deploymentName) - .delete - Boolean.unbox(r) - }.getOrElse(false) + val map = client.configMaps().inNamespace(nameSpace) + map.withLabel("app", deploymentName).delete + map.withName(deploymentName).delete } }
