This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 03c6a5e49 [Improve]When the flink task on k8s keeps restarting, 
streampark won't be able to do anything about it (#1725)
03c6a5e49 is described below

commit 03c6a5e494dbef77d14fbc046aab602443b23153
Author: monster <[email protected]>
AuthorDate: Sun Oct 2 16:06:07 2022 +0800

    [Improve]When the flink task on k8s keeps restarting, streampark won't be 
able to do anything about it (#1725)
---
 .../console/core/service/impl/ApplicationServiceImpl.java   |  8 ++++++++
 .../kubernetes/helper/KubernetesDeploymentHelper.scala      | 13 +++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3395f89d8..c0a39f8ee 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -79,6 +79,7 @@ import 
org.apache.streampark.console.core.task.FlinkTrackingTask;
 import org.apache.streampark.flink.core.conf.ParameterCli;
 import org.apache.streampark.flink.kubernetes.IngressController;
 import org.apache.streampark.flink.kubernetes.K8sFlinkTrackMonitor;
+import 
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
@@ -905,6 +906,13 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         if (startFuture == null && cancelFuture == null) {
             this.updateToStopped(app);
         }
+        if (isKubernetesApp(app)) {
+            
KubernetesDeploymentHelper.watchPodTerminatedLog(app.getK8sNamespace(), 
app.getJobName());
+            
KubernetesDeploymentHelper.deleteTaskDeployment(app.getK8sNamespace(), 
app.getJobName());
+            
KubernetesDeploymentHelper.deleteTaskConfigMap(app.getK8sNamespace(), 
app.getJobName());
+            IngressController.deleteIngress(app.getK8sNamespace(), 
app.getJobName());
+        }
+
     }
 
     @Override
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 3a13dc73b..375dd4d23 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -108,4 +108,17 @@ object KubernetesDeploymentHelper extends Logger {
       }.getOrElse(null)
     }(error => throw error)
   }
+
+  def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean 
= {
+    tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
+      Try {
+        val r = client.configMaps()
+          .inNamespace(nameSpace)
+          .withLabel("app", deploymentName)
+          .delete
+        Boolean.unbox(r)
+      }.getOrElse(false)
+    }
+  }
+
 }

Reply via email to