This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch k8s_deploy_shutdown in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 4d969ac02d348590b6b495b8dc51c31ae585df1b Author: benjobs <[email protected]> AuthorDate: Thu Jan 11 21:18:47 2024 +0800 [Improve] flink job on k8s-application mode cancel improvement --- .../flink/client/impl/KubernetesNativeApplicationClient.scala | 10 +++++++++- .../apache/streampark/flink/client/impl/YarnPerJobClient.scala | 8 ++++++-- .../flink/client/trait/KubernetesNativeClientTrait.scala | 2 +- .../apache/streampark/flink/client/trait/YarnClientTrait.scala | 3 --- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala index c608687f2..f5623113e 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala @@ -90,7 +90,15 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = { flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName) - super.doCancel(cancelRequest, flinkConf) + executeClientAction( + cancelRequest, + flinkConf, + (jobId, client) => { + val resp = super.cancelJob(cancelRequest, jobId, client) + client.shutDownCluster() + CancelResponse(resp) + } + ) } override def doTriggerSavepoint( diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala index 6ccf7001f..71499e290 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala @@ -25,7 +25,7 @@ import org.apache.streampark.flink.util.FlinkUtils import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader import org.apache.flink.client.program.{ClusterClient, PackagedProgram} import org.apache.flink.configuration.{Configuration, DeploymentOptions} -import org.apache.flink.yarn.YarnClusterDescriptor +import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor} import org.apache.flink.yarn.configuration.YarnDeploymentTarget import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint import org.apache.hadoop.fs.{Path => HadoopPath} @@ -123,7 +123,11 @@ object YarnPerJobClient extends YarnClientTrait { override def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): CancelResponse = { flinkConf .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName) - super.doCancel(cancelRequest, flinkConf) + val response = super.doCancel(cancelRequest, flinkConf) + val clusterClientFactory = new YarnClusterClientFactory + val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf) + clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId)) + response } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala index bf3b1767b..ff65a998e 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala @@ -95,7 +95,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait { ) } - private[this] def executeClientAction[O, R <: SavepointRequestTrait]( + private[client] def executeClientAction[O, R <: SavepointRequestTrait]( request: R, flinkConfig: Configuration, actFunc: (JobID, ClusterClient[_]) => O): O = { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala index 445f45e7f..5777fc82d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala @@ -89,9 +89,6 @@ trait YarnClientTrait extends FlinkClientTrait { flinkConf, (jobId, client) => { val resp = super.cancelJob(cancelRequest, jobId, client) - if (cancelRequest.executionMode == ExecutionMode.YARN_PER_JOB) { - client.shutDownCluster() - } CancelResponse(resp) } )
