This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 03c6a5e49 [Improve]When the flink task on k8s keeps restarting,
streampark won't be able to do anything about it (#1725)
03c6a5e49 is described below
commit 03c6a5e494dbef77d14fbc046aab602443b23153
Author: monster <[email protected]>
AuthorDate: Sun Oct 2 16:06:07 2022 +0800
[Improve]When the flink task on k8s keeps restarting, streampark won't be
able to do anything about it (#1725)
---
.../console/core/service/impl/ApplicationServiceImpl.java | 8 ++++++++
.../kubernetes/helper/KubernetesDeploymentHelper.scala | 13 +++++++++++++
2 files changed, 21 insertions(+)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3395f89d8..c0a39f8ee 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -79,6 +79,7 @@ import
org.apache.streampark.console.core.task.FlinkTrackingTask;
import org.apache.streampark.flink.core.conf.ParameterCli;
import org.apache.streampark.flink.kubernetes.IngressController;
import org.apache.streampark.flink.kubernetes.K8sFlinkTrackMonitor;
+import
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
@@ -905,6 +906,13 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (startFuture == null && cancelFuture == null) {
this.updateToStopped(app);
}
+ if (isKubernetesApp(app)) {
+
KubernetesDeploymentHelper.watchPodTerminatedLog(app.getK8sNamespace(),
app.getJobName());
+
KubernetesDeploymentHelper.deleteTaskDeployment(app.getK8sNamespace(),
app.getJobName());
+
KubernetesDeploymentHelper.deleteTaskConfigMap(app.getK8sNamespace(),
app.getJobName());
+ IngressController.deleteIngress(app.getK8sNamespace(),
app.getJobName());
+ }
+
}
@Override
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 3a13dc73b..375dd4d23 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -108,4 +108,17 @@ object KubernetesDeploymentHelper extends Logger {
}.getOrElse(null)
}(error => throw error)
}
+
+ def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean
= {
+ tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
+ Try {
+ val r = client.configMaps()
+ .inNamespace(nameSpace)
+ .withLabel("app", deploymentName)
+ .delete
+ Boolean.unbox(r)
+ }.getOrElse(false)
+ }
+ }
+
}