This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s_deploy_shutdown
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 4d969ac02d348590b6b495b8dc51c31ae585df1b
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 21:18:47 2024 +0800

    [Improve] flink job on k8s-application mode cancel improvement
---
 .../flink/client/impl/KubernetesNativeApplicationClient.scala  | 10 +++++++++-
 .../apache/streampark/flink/client/impl/YarnPerJobClient.scala |  8 ++++++--
 .../flink/client/trait/KubernetesNativeClientTrait.scala       |  2 +-
 .../apache/streampark/flink/client/trait/YarnClientTrait.scala |  3 ---
 4 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index c608687f2..f5623113e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -90,7 +90,15 @@ object KubernetesNativeApplicationClient extends 
KubernetesNativeClientTrait {
 
   override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
     flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    super.doCancel(cancelRequest, flinkConf)
+    executeClientAction(
+      cancelRequest,
+      flinkConf,
+      (jobId, client) => {
+        val resp = super.cancelJob(cancelRequest, jobId, client)
+        client.shutDownCluster()
+        CancelResponse(resp)
+      }
+    )
   }
 
   override def doTriggerSavepoint(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 6ccf7001f..71499e290 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -25,7 +25,7 @@ import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration.{Configuration, DeploymentOptions}
-import org.apache.flink.yarn.YarnClusterDescriptor
+import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
 import org.apache.hadoop.fs.{Path => HadoopPath}
@@ -123,7 +123,11 @@ object YarnPerJobClient extends YarnClientTrait {
   override def doCancel(cancelRequest: CancelRequest, flinkConf: 
Configuration): CancelResponse = {
     flinkConf
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
-    super.doCancel(cancelRequest, flinkConf)
+    val response = super.doCancel(cancelRequest, flinkConf)
+    val clusterClientFactory = new YarnClusterClientFactory
+    val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConf)
+    
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
+    response
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index bf3b1767b..ff65a998e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -95,7 +95,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
     )
   }
 
-  private[this] def executeClientAction[O, R <: SavepointRequestTrait](
+  private[client] def executeClientAction[O, R <: SavepointRequestTrait](
       request: R,
       flinkConfig: Configuration,
       actFunc: (JobID, ClusterClient[_]) => O): O = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 445f45e7f..5777fc82d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -89,9 +89,6 @@ trait YarnClientTrait extends FlinkClientTrait {
       flinkConf,
       (jobId, client) => {
         val resp = super.cancelJob(cancelRequest, jobId, client)
-        if (cancelRequest.executionMode == ExecutionMode.YARN_PER_JOB) {
-          client.shutDownCluster()
-        }
         CancelResponse(resp)
       }
     )

Reply via email to