This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new ca6973ce2 fix: k8s mode flink cluster can't be stopped (#3965)
ca6973ce2 is described below

commit ca6973ce2e0870b633d734123727916bf640884c
Author: Darcy <[email protected]>
AuthorDate: Fri Aug 16 23:38:09 2024 +0800

    fix: k8s mode flink cluster can't be stopped (#3965)
---
 .../streampark/console/core/enums/FlinkAppState.java      |  6 +-----
 .../console/core/service/impl/ApplicationServiceImpl.java | 11 ++++++-----
 .../streampark/flink/kubernetes/enums/FlinkJobState.scala |  2 --
 .../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala  | 15 ++++-----------
 4 files changed, 11 insertions(+), 23 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 6e558a65a..da69fe14d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -141,11 +141,7 @@ public enum FlinkAppState implements Serializable {
   public static class Bridge {
     /** covert from org.apache.streampark.flink.k8s.enums.FlinkJobState */
     public static FlinkAppState fromK8sFlinkJobState(Enumeration.Value 
flinkJobState) {
-      if (FlinkJobState.K8S_INITIALIZING().equals(flinkJobState)) {
-        return INITIALIZING;
-      } else {
-        return of(flinkJobState.toString());
-      }
+      return of(flinkJobState.toString());
     }
 
     /** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 45dd84156..971c6d8ae 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1844,12 +1844,13 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     application.setOptionTime(new Date());
     updateById(application);
     savepointService.expire(application.getId());
-    // re-tracking flink job on kubernetes and logging exception
+    // delete deployment
     if (application.isKubernetesModeJob()) {
-      TrackId id = k8sWatcherWrapper.toTrackId(application);
-      flinkK8sWatcher.doWatching(id);
-    } else {
-      FlinkAppHttpWatcher.unWatching(application.getId());
+      try {
+        KubernetesDeploymentHelper.delete(application.getK8sNamespace(), 
application.getJobName());
+      } catch (Exception e) {
+        log.error("job abort failed!", e);
+      }
     }
     // kill application
     if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
index fcd4e88e0..93c817d90 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
@@ -22,8 +22,6 @@ object FlinkJobState extends Enumeration {
 
   // flink job has been submit by the streampark.
   val STARTING,
-  // flink k8s resources are being initialized.
-  K8S_INITIALIZING,
   // lost track of flink job temporarily.
   SILENT,
   // flink job has terminated positively (maybe FINISHED or CANCELED)
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 59f2e37a6..ec0aa6a82 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -323,20 +323,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
         val isConnection = KubernetesDeploymentHelper.checkConnection()
 
         if (deployExists) {
-          val deployError = KubernetesDeploymentHelper.isDeploymentError(
-            trackId.namespace,
-            trackId.clusterId
-          )
-          if (!deployError) {
-            logger.info("Task Enter the initialization process.")
-            FlinkJobState.K8S_INITIALIZING
-          } else if (isConnection) {
-            logger.info("Enter the task failure deletion process.")
-            KubernetesDeploymentHelper.watchPodTerminatedLog(
+          if (isConnection) {
+            logger.info("Enter the task starting process.")
+            KubernetesDeploymentHelper.watchDeploymentLog(
               trackId.namespace,
               trackId.clusterId,
               trackId.jobId)
-            FlinkJobState.FAILED
+            FlinkJobState.STARTING
           } else {
             inferFromPreCache(latest)
           }

Reply via email to