This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 47cc9fe43 [Bug] k8s job state monitor bug fixed (#3458)
47cc9fe43 is described below

commit 47cc9fe43a2d5e462382faf3358bdf88957efe57
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 6 18:27:29 2024 +0800

    [Bug] k8s job state monitor bug fixed (#3458)
    
    * [Bug] k8s job state monitor bug fixed
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../org/apache/streampark/common/util/Utils.scala  |  7 ++
 .../console/core/service/ApplicationService.java   |  2 +
 .../core/service/impl/ApplicationServiceImpl.java  | 88 +++++++++++++---------
 .../console/core/task/FlinkK8sWatcherWrapper.java  | 19 ++---
 .../flink/kubernetes/KubernetesRetriever.scala     |  6 +-
 .../helper/KubernetesDeploymentHelper.scala        | 44 +++++------
 .../flink/kubernetes/model/TrackId.scala           |  2 +
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 45 ++++++-----
 8 files changed, 124 insertions(+), 89 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index d2047ced5..3e35c388b 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -97,6 +97,13 @@ object Utils extends Logger {
     new JarInputStream(new BufferedInputStream(new 
FileInputStream(jarFile))).getManifest
   }
 
+  def getJarManClass(jarFile: File): String = {
+    val manifest = getJarManifest(jarFile)
+    val mainAttr = manifest.getMainAttributes
+    Option(mainAttr.getValue("Main-Class"))
+      .getOrElse(Option(mainAttr.getValue("program-class")).orNull)
+  }
+
   def copyProperties(original: Properties, target: Properties): Unit =
     original.foreach(x => target.put(x._1, x._2))
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 9193cefa3..5ffa7a02e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -86,6 +86,8 @@ public interface ApplicationService extends 
IService<Application> {
 
   boolean checkAlter(Application application);
 
+  Map<String, String> getRumtimeConfig(Long id);
+
   void updateRelease(Application application);
 
   List<Application> getByProjectId(Long id);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 49f7a32f1..c58fcd5f2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -136,6 +136,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.EnumSet;
@@ -152,7 +153,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.jar.Manifest;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -403,11 +403,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // 8) remove app
     removeApp(application);
 
-    if (isKubernetesApp(application)) {
-      k8SFlinkTrackMonitor.unWatching(toTrackId(application));
-    } else {
-      FlinkRESTAPIWatcher.unWatching(paramApp.getId());
-    }
     return true;
   }
 
@@ -460,6 +455,23 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return cancelUserId != -1 && cancelUserId != appUserId;
   }
 
+  @Override
+  public Map<String, String> getRumtimeConfig(Long id) {
+    Application application = getById(id);
+    if (application != null && application.getVersionId() != null) {
+      FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(application.getVersionId());
+      if (flinkEnv != null) {
+        File yaml = new 
File(flinkEnv.getFlinkHome().concat("/conf/flink-conf.yaml"));
+        Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(yaml);
+        Map<String, String> dynamicConf =
+            
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+        config.putAll(dynamicConf);
+        return config;
+      }
+    }
+    return Collections.emptyMap();
+  }
+
   private void removeApp(Application application) {
     Long appId = application.getId();
     removeById(appId);
@@ -695,7 +707,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         }
         // check whether clusterId, namespace, jobId on kubernetes
         else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
-            && 
k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
+            && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
           return AppExistsState.IN_KUBERNETES;
         }
       }
@@ -1135,10 +1147,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (isKubernetesApp(application)) {
       KubernetesDeploymentHelper.watchPodTerminatedLog(
           application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
-      KubernetesDeploymentHelper.deleteTaskDeployment(
-          application.getK8sNamespace(), application.getJobName());
-      KubernetesDeploymentHelper.deleteTaskConfigMap(
-          application.getK8sNamespace(), application.getJobName());
+      KubernetesDeploymentHelper.delete(application.getK8sNamespace(), 
application.getJobName());
     }
     if (startFuture != null) {
       startFuture.cancel(true);
@@ -1217,8 +1226,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           
project.getDistHome().getAbsolutePath().concat("/").concat(application.getModule());
       jarFile = new File(modulePath, application.getJar());
     }
-    Manifest manifest = Utils.getJarManifest(jarFile);
-    return manifest.getMainAttributes().getValue("Main-Class");
+    return Utils.getJarManClass(jarFile);
   }
 
   @Override
@@ -1322,6 +1330,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
+    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
+
     cancelFuture.whenComplete(
         (cancelResponse, throwable) -> {
           cancelFutureMap.remove(application.getId());
@@ -1345,9 +1355,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               }
               // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                TrackId id = toTrackId(application);
-                k8SFlinkTrackMonitor.unWatching(id);
-                k8SFlinkTrackMonitor.doWatching(id);
+                KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
+                k8SFlinkTrackMonitor.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
               }
@@ -1373,7 +1382,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           }
 
           if (isKubernetesApp(application)) {
-            k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+            KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
+            k8SFlinkTrackMonitor.unWatching(trackId);
           }
         });
   }
@@ -1389,24 +1399,22 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       final URI uri = URI.create(savepointPath);
       final String scheme = uri.getScheme();
       final String pathPart = uri.getPath();
-      String error = null;
       if (scheme == null) {
-        error =
-            "This state.savepoints.dir value "
-                + savepointPath
-                + " scheme (hdfs://, file://, etc) of  is null. Please specify 
the file system scheme explicitly in the URI.";
-      } else if (pathPart == null) {
-        error =
-            "This state.savepoints.dir value "
-                + savepointPath
-                + " path part to store the checkpoint data in is null. Please 
specify a directory path for the checkpoint data.";
-      } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
-        error =
-            "This state.savepoints.dir value "
-                + savepointPath
-                + " Cannot use the root directory for checkpoints.";
+        return "This state.savepoints.dir value "
+            + savepointPath
+            + " scheme (hdfs://, file://, etc) of  is null. Please specify the 
file system scheme explicitly in the URI.";
+      }
+      if (pathPart == null) {
+        return "This state.savepoints.dir value "
+            + savepointPath
+            + " path part to store the checkpoint data in is null. Please 
specify a directory path for the checkpoint data.";
       }
-      return error;
+      if (pathPart.isEmpty() || "/".equals(pathPart)) {
+        return "This state.savepoints.dir value "
+            + savepointPath
+            + " Cannot use the root directory for checkpoints.";
+      }
+      return null;
     } else {
       return "When custom savepoint is not set, state.savepoints.dir needs to 
be set in properties or flink-conf.yaml of application";
     }
@@ -1552,6 +1560,14 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
     }
 
+    TrackId trackId;
+    if (isKubernetesApp(application)) {
+      trackId = toTrackId(application);
+      KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
+    } else {
+      trackId = null;
+    }
+
     KubernetesSubmitParam kubernetesSubmitParam =
         new KubernetesSubmitParam(
             application.getClusterId(),
@@ -1613,7 +1629,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               app.setOptionState(OptionState.NONE.getValue());
               updateById(app);
               if (isKubernetesApp(app)) {
-                k8SFlinkTrackMonitor.unWatching(toTrackId(app));
+                k8SFlinkTrackMonitor.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(appParam.getId());
               }
@@ -1649,7 +1665,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           // if start completed, will be added task to tracking queue
           if (isKubernetesApp(application)) {
             application.setRelease(ReleaseState.DONE.get());
-            k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+            k8SFlinkTrackMonitor.doWatching(trackId);
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
               String domainName = settingService.getIngressModeDefault();
               if (StringUtils.isNotBlank(domainName)) {
@@ -1748,7 +1764,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // re-tracking flink job on kubernetes and logging exception
     if (isKubernetesApp(application)) {
       TrackId id = toTrackId(application);
-      k8SFlinkTrackMonitor.unWatching(id);
+      KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
       k8SFlinkTrackMonitor.doWatching(id);
     } else {
       FlinkRESTAPIWatcher.unWatching(application.getId());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index af3b816d5..d54580032 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -23,6 +23,7 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
 import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
+import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
@@ -101,17 +102,16 @@ public class FlinkK8sWatcherWrapper {
     if (CollectionUtils.isEmpty(k8sApplication)) {
       return Lists.newArrayList();
     }
-    // correct corrupted data
-    List<Application> correctApps =
-        k8sApplication.stream()
-            .filter(app -> !Bridge.toTrackId(app).isLegal())
-            .collect(Collectors.toList());
-    if (CollectionUtils.isNotEmpty(correctApps)) {
-      applicationService.saveOrUpdateBatch(correctApps);
-    }
     // filter out the application that should be tracking
     return k8sApplication.stream()
-        .filter(app -> 
!FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
+        .filter(
+            app -> {
+              boolean isEndState =
+                  
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()));
+              boolean deploymentExists =
+                  
KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(), 
app.getClusterId());
+              return !isEndState || deploymentExists;
+            })
         .map(Bridge::toTrackId)
         .collect(Collectors.toList());
   }
@@ -121,6 +121,7 @@ public class FlinkK8sWatcherWrapper {
 
     // covert Application to TrackId
     public static TrackId toTrackId(@Nonnull Application app) {
+
       Enumeration.Value mode = 
FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
       if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
         return TrackId.onApplication(
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 71ee9d6de..79ba51812 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -63,7 +63,7 @@ object KubernetesRetriever extends Logger {
   private val clusterClientServiceLoader = new 
DefaultClusterClientServiceLoader()
 
   /** get new flink cluster client of kubernetes mode */
-  def newFinkClusterClient(
+  private def newFinkClusterClient(
       clusterId: String,
       @Nullable namespace: String,
       executeMode: FlinkK8sExecuteMode.Value): Option[ClusterClient[String]] = 
{
@@ -110,7 +110,7 @@ object KubernetesRetriever extends Logger {
    * @param namespace
    *   deployment namespace
    */
-  def isDeploymentExists(name: String, namespace: String): Boolean = {
+  def isDeploymentExists(namespace: String, deploymentName: String): Boolean = 
{
     using(KubernetesRetriever.newK8sClient()) {
       client =>
         client
@@ -121,7 +121,7 @@ object KubernetesRetriever extends Logger {
           .list()
           .getItems
           .asScala
-          .exists(e => e.getMetadata.getName == name)
+          .exists(_.getMetadata.getName == deploymentName)
     }(_ => false)
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 5495814f2..590cd806d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -55,7 +55,7 @@ object KubernetesDeploymentHelper extends Logger {
     }
   }
 
-  def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): 
Boolean = {
+  def isDeploymentError(nameSpace: String, deploymentName: String): Boolean = {
     Try {
       val pods = getPods(nameSpace, deploymentName)
       val podStatus = pods.head.getStatus
@@ -68,12 +68,7 @@ object KubernetesDeploymentHelper extends Logger {
     }.getOrElse(true)
   }
 
-  def getTheNumberOfTaskDeploymentRetries(nameSpace: String, deploymentName: 
String): Integer = {
-    val pods = getPods(nameSpace, deploymentName)
-    pods.head.getStatus.getContainerStatuses.head.getRestartCount
-  }
-
-  def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean 
= {
+  private[this] def deleteDeployment(nameSpace: String, deploymentName: 
String): Boolean = {
     using(KubernetesRetriever.newK8sClient()) {
       client =>
         Try {
@@ -86,7 +81,26 @@ object KubernetesDeploymentHelper extends Logger {
     }
   }
 
-  def isTheK8sConnectionNormal(): Boolean = {
+  private[this] def deleteConfigMap(nameSpace: String, deploymentName: 
String): Boolean = {
+    using(KubernetesRetriever.newK8sClient()) {
+      client =>
+        Try {
+          val r = client
+            .configMaps()
+            .inNamespace(nameSpace)
+            .withLabel("app", deploymentName)
+            .delete
+          Boolean.unbox(r)
+        }.getOrElse(false)
+    }
+  }
+
+  def delete(nameSpace: String, deploymentName: String): Unit = {
+    deleteDeployment(nameSpace, deploymentName)
+    deleteConfigMap(nameSpace, deploymentName)
+  }
+
+  def checkConnection(): Boolean = {
     Try(new DefaultKubernetesClient) match {
       case Success(client) =>
         client.close()
@@ -125,20 +139,6 @@ object KubernetesDeploymentHelper extends Logger {
     }(error => throw error)
   }
 
-  def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean 
= {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          val r = client
-            .configMaps()
-            .inNamespace(nameSpace)
-            .withLabel("app", deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
-    }
-  }
-
   private[kubernetes] def getJobLog(jobId: String): String = {
     val tmpPath = SystemPropertyUtils.getTmpdir()
     s"$tmpPath/$jobId.log"
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index fba47eee6..e280122c8 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -20,6 +20,8 @@ package org.apache.streampark.flink.kubernetes.model
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 
+import java.lang.{Boolean => JavaBool}
+
 import scala.util.Try
 
 /** tracking identifier for flink on kubernetes */
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 55f62f07f..2879f17bb 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -125,7 +125,12 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
                 watchController.trackIds.update(trackId)
                 eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
               }
-              if (FlinkJobState.isEndState(jobState.jobState)) {
+
+              val deployExists = KubernetesRetriever.isDeploymentExists(
+                trackId.namespace,
+                trackId.clusterId
+              )
+              if (FlinkJobState.isEndState(jobState.jobState) && 
!deployExists) {
                 // remove trackId from cache of job that needs to be untracked
                 watchController.unWatching(trackId)
                 if (trackId.executeMode == APPLICATION) {
@@ -213,9 +218,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     } else {
       jobDetails.map {
         d =>
-          TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> 
d.toJobStatusCV(
-            pollEmitTime,
-            System.currentTimeMillis)
+          TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d
+            .toJobStatusCV(pollEmitTime, System.currentTimeMillis)
       }
     }
   }
@@ -284,21 +288,27 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
 
     // infer from k8s deployment and event
     val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
-    logger.info(
-      s"Query the local cache 
result:${watchController.canceling.has(trackId).toString},trackId 
${trackId.toString}.")
-    val jobState = {
-      if (watchController.canceling.has(trackId)) FlinkJobState.CANCELED
-      else {
+
+    val jobState = trackId match {
+      case id if watchController.canceling.has(id) =>
+        logger.info(s"trackId ${trackId.toString} is canceling")
+        FlinkJobState.CANCELED
+      case _ =>
         // whether deployment exists on kubernetes cluster
-        val isDeployExists =
-          KubernetesRetriever.isDeploymentExists(trackId.clusterId, 
trackId.namespace)
-        val deployStateOfTheError = 
KubernetesDeploymentHelper.getDeploymentStatusChanges(
+        val deployExists = KubernetesRetriever.isDeploymentExists(
           trackId.namespace,
-          trackId.clusterId)
-        val isConnection = 
KubernetesDeploymentHelper.isTheK8sConnectionNormal()
+          trackId.clusterId
+        )
 
-        if (isDeployExists) {
-          if (!deployStateOfTheError) {
+        val deployError = KubernetesDeploymentHelper.isDeploymentError(
+          trackId.namespace,
+          trackId.clusterId
+        )
+
+        val isConnection = KubernetesDeploymentHelper.checkConnection()
+
+        if (deployExists) {
+          if (!deployError) {
             logger.info("Task Enter the initialization process.")
             FlinkJobState.K8S_INITIALIZING
           } else if (isConnection) {
@@ -307,7 +317,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
               trackId.namespace,
               trackId.clusterId,
               trackId.jobId)
-            KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, 
trackId.clusterId)
             FlinkJobState.FAILED
           } else {
             inferSilentOrLostFromPreCache(latest)
@@ -318,8 +327,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
         } else {
           inferSilentOrLostFromPreCache(latest)
         }
-
-      }
     }
 
     val jobStatusCV = JobStatusCV(

Reply via email to