This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-monitor
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 9dace2ed7470451e2062f4e20b9ec8c991460029
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 6 14:30:50 2024 +0800

    [Bug] k8s job state monitor bug fixed
---
 .../console/core/service/ApplicationService.java   |  2 +
 .../core/service/impl/ApplicationServiceImpl.java  | 84 ++++++++++++++--------
 .../console/core/task/FlinkK8sWatcherWrapper.java  | 19 ++---
 .../helper/KubernetesDeploymentHelper.scala        | 44 ++++++------
 .../flink/kubernetes/model/TrackId.scala           |  2 +
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 38 +++++-----
 6 files changed, 111 insertions(+), 78 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 9193cefa3..5ffa7a02e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -86,6 +86,8 @@ public interface ApplicationService extends 
IService<Application> {
 
   boolean checkAlter(Application application);
 
+  Map<String, String> getRumtimeConfig(Long id);
+
   void updateRelease(Application application);
 
   List<Application> getByProjectId(Long id);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 49f7a32f1..e369ab3d6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,6 +78,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
 import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -136,6 +137,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.EnumSet;
@@ -404,7 +406,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     removeApp(application);
 
     if (isKubernetesApp(application)) {
-      k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+      TrackId trackId = FlinkK8sWatcherWrapper.Bridge.toTrackId(application);
+      KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
+      k8SFlinkTrackMonitor.unWatching(trackId);
     } else {
       FlinkRESTAPIWatcher.unWatching(paramApp.getId());
     }
@@ -460,6 +464,23 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return cancelUserId != -1 && cancelUserId != appUserId;
   }
 
+  @Override
+  public Map<String, String> getRumtimeConfig(Long id) {
+    Application application = getById(id);
+    if (application != null && application.getVersionId() != null) {
+      FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(application.getVersionId());
+      if (flinkEnv != null) {
+        File yaml = new 
File(flinkEnv.getFlinkHome().concat("/conf/flink-conf.yaml"));
+        Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(yaml);
+        Map<String, String> dynamicConf =
+            
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+        config.putAll(dynamicConf);
+        return config;
+      }
+    }
+    return Collections.emptyMap();
+  }
+
   private void removeApp(Application application) {
     Long appId = application.getId();
     removeById(appId);
@@ -695,7 +716,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         }
         // check whether clusterId, namespace, jobId on kubernetes
         else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
-            && 
k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
+            && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
           return AppExistsState.IN_KUBERNETES;
         }
       }
@@ -1135,10 +1156,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (isKubernetesApp(application)) {
       KubernetesDeploymentHelper.watchPodTerminatedLog(
           application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
-      KubernetesDeploymentHelper.deleteTaskDeployment(
-          application.getK8sNamespace(), application.getJobName());
-      KubernetesDeploymentHelper.deleteTaskConfigMap(
-          application.getK8sNamespace(), application.getJobName());
+      KubernetesDeploymentHelper.delete(application.getK8sNamespace(), 
application.getJobName());
     }
     if (startFuture != null) {
       startFuture.cancel(true);
@@ -1322,6 +1340,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
+    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
+
     cancelFuture.whenComplete(
         (cancelResponse, throwable) -> {
           cancelFutureMap.remove(application.getId());
@@ -1345,9 +1365,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               }
               // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                TrackId id = toTrackId(application);
-                k8SFlinkTrackMonitor.unWatching(id);
-                k8SFlinkTrackMonitor.doWatching(id);
+                KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
+                k8SFlinkTrackMonitor.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
               }
@@ -1373,7 +1392,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           }
 
           if (isKubernetesApp(application)) {
-            k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+            KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
+            k8SFlinkTrackMonitor.unWatching(trackId);
           }
         });
   }
@@ -1389,24 +1409,22 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       final URI uri = URI.create(savepointPath);
       final String scheme = uri.getScheme();
       final String pathPart = uri.getPath();
-      String error = null;
       if (scheme == null) {
-        error =
-            "This state.savepoints.dir value "
-                + savepointPath
-                + " scheme (hdfs://, file://, etc) of  is null. Please specify 
the file system scheme explicitly in the URI.";
-      } else if (pathPart == null) {
-        error =
-            "This state.savepoints.dir value "
-                + savepointPath
-                + " path part to store the checkpoint data in is null. Please 
specify a directory path for the checkpoint data.";
-      } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
-        error =
-            "This state.savepoints.dir value "
-                + savepointPath
-                + " Cannot use the root directory for checkpoints.";
+        return "This state.savepoints.dir value "
+            + savepointPath
+            + " scheme (hdfs://, file://, etc) of  is null. Please specify the 
file system scheme explicitly in the URI.";
       }
-      return error;
+      if (pathPart == null) {
+        return "This state.savepoints.dir value "
+            + savepointPath
+            + " path part to store the checkpoint data in is null. Please 
specify a directory path for the checkpoint data.";
+      }
+      if (pathPart.isEmpty() || "/".equals(pathPart)) {
+        return "This state.savepoints.dir value "
+            + savepointPath
+            + " Cannot use the root directory for checkpoints.";
+      }
+      return null;
     } else {
       return "When custom savepoint is not set, state.savepoints.dir needs to 
be set in properties or flink-conf.yaml of application";
     }
@@ -1552,6 +1570,14 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
     }
 
+    TrackId trackId;
+    if (isKubernetesApp(application)) {
+      trackId = toTrackId(application);
+      KubernetesDeploymentHelper.delete(trackId.namespace(), 
trackId.clusterId());
+    } else {
+      trackId = null;
+    }
+
     KubernetesSubmitParam kubernetesSubmitParam =
         new KubernetesSubmitParam(
             application.getClusterId(),
@@ -1613,7 +1639,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               app.setOptionState(OptionState.NONE.getValue());
               updateById(app);
               if (isKubernetesApp(app)) {
-                k8SFlinkTrackMonitor.unWatching(toTrackId(app));
+                k8SFlinkTrackMonitor.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(appParam.getId());
               }
@@ -1649,7 +1675,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           // if start completed, will be added task to tracking queue
           if (isKubernetesApp(application)) {
             application.setRelease(ReleaseState.DONE.get());
-            k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+            k8SFlinkTrackMonitor.doWatching(trackId);
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
               String domainName = settingService.getIngressModeDefault();
               if (StringUtils.isNotBlank(domainName)) {
@@ -1748,7 +1774,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // re-tracking flink job on kubernetes and logging exception
     if (isKubernetesApp(application)) {
       TrackId id = toTrackId(application);
-      k8SFlinkTrackMonitor.unWatching(id);
+      KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
       k8SFlinkTrackMonitor.doWatching(id);
     } else {
       FlinkRESTAPIWatcher.unWatching(application.getId());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index af3b816d5..fb8cafe73 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -23,6 +23,7 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
 import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
+import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
@@ -101,17 +102,16 @@ public class FlinkK8sWatcherWrapper {
     if (CollectionUtils.isEmpty(k8sApplication)) {
       return Lists.newArrayList();
     }
-    // correct corrupted data
-    List<Application> correctApps =
-        k8sApplication.stream()
-            .filter(app -> !Bridge.toTrackId(app).isLegal())
-            .collect(Collectors.toList());
-    if (CollectionUtils.isNotEmpty(correctApps)) {
-      applicationService.saveOrUpdateBatch(correctApps);
-    }
     // filter out the application that should be tracking
     return k8sApplication.stream()
-        .filter(app -> 
!FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
+        .filter(
+            app -> {
+              boolean isEndState =
+                  
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()));
+              boolean deploymentExists =
+                  KubernetesRetriever.isDeploymentExists(app.getClusterId(), 
app.getK8sNamespace());
+              return !isEndState && deploymentExists;
+            })
         .map(Bridge::toTrackId)
         .collect(Collectors.toList());
   }
@@ -121,6 +121,7 @@ public class FlinkK8sWatcherWrapper {
 
     // covert Application to TrackId
     public static TrackId toTrackId(@Nonnull Application app) {
+
       Enumeration.Value mode = 
FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
       if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
         return TrackId.onApplication(
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 5495814f2..590cd806d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -55,7 +55,7 @@ object KubernetesDeploymentHelper extends Logger {
     }
   }
 
-  def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): 
Boolean = {
+  def isDeploymentError(nameSpace: String, deploymentName: String): Boolean = {
     Try {
       val pods = getPods(nameSpace, deploymentName)
       val podStatus = pods.head.getStatus
@@ -68,12 +68,7 @@ object KubernetesDeploymentHelper extends Logger {
     }.getOrElse(true)
   }
 
-  def getTheNumberOfTaskDeploymentRetries(nameSpace: String, deploymentName: 
String): Integer = {
-    val pods = getPods(nameSpace, deploymentName)
-    pods.head.getStatus.getContainerStatuses.head.getRestartCount
-  }
-
-  def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean 
= {
+  private[this] def deleteDeployment(nameSpace: String, deploymentName: 
String): Boolean = {
     using(KubernetesRetriever.newK8sClient()) {
       client =>
         Try {
@@ -86,7 +81,26 @@ object KubernetesDeploymentHelper extends Logger {
     }
   }
 
-  def isTheK8sConnectionNormal(): Boolean = {
+  private[this] def deleteConfigMap(nameSpace: String, deploymentName: 
String): Boolean = {
+    using(KubernetesRetriever.newK8sClient()) {
+      client =>
+        Try {
+          val r = client
+            .configMaps()
+            .inNamespace(nameSpace)
+            .withLabel("app", deploymentName)
+            .delete
+          Boolean.unbox(r)
+        }.getOrElse(false)
+    }
+  }
+
+  def delete(nameSpace: String, deploymentName: String): Unit = {
+    deleteDeployment(nameSpace, deploymentName)
+    deleteConfigMap(nameSpace, deploymentName)
+  }
+
+  def checkConnection(): Boolean = {
     Try(new DefaultKubernetesClient) match {
       case Success(client) =>
         client.close()
@@ -125,20 +139,6 @@ object KubernetesDeploymentHelper extends Logger {
     }(error => throw error)
   }
 
-  def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean 
= {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          val r = client
-            .configMaps()
-            .inNamespace(nameSpace)
-            .withLabel("app", deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
-    }
-  }
-
   private[kubernetes] def getJobLog(jobId: String): String = {
     val tmpPath = SystemPropertyUtils.getTmpdir()
     s"$tmpPath/$jobId.log"
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index fba47eee6..e280122c8 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -20,6 +20,8 @@ package org.apache.streampark.flink.kubernetes.model
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 
+import java.lang.{Boolean => JavaBool}
+
 import scala.util.Try
 
 /** tracking identifier for flink on kubernetes */
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 55f62f07f..94cb47e78 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -213,9 +213,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     } else {
       jobDetails.map {
         d =>
-          TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> 
d.toJobStatusCV(
-            pollEmitTime,
-            System.currentTimeMillis)
+          TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d
+            .toJobStatusCV(pollEmitTime, System.currentTimeMillis)
       }
     }
   }
@@ -284,21 +283,27 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
 
     // infer from k8s deployment and event
     val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
-    logger.info(
-      s"Query the local cache 
result:${watchController.canceling.has(trackId).toString},trackId 
${trackId.toString}.")
-    val jobState = {
-      if (watchController.canceling.has(trackId)) FlinkJobState.CANCELED
-      else {
+
+    val jobState = trackId match {
+      case id if watchController.canceling.has(id) =>
+        logger.info(s"trackId ${trackId.toString} is canceling")
+        FlinkJobState.CANCELED
+      case _ =>
         // whether deployment exists on kubernetes cluster
-        val isDeployExists =
-          KubernetesRetriever.isDeploymentExists(trackId.clusterId, 
trackId.namespace)
-        val deployStateOfTheError = 
KubernetesDeploymentHelper.getDeploymentStatusChanges(
+        val deployExists = KubernetesRetriever.isDeploymentExists(
+          trackId.clusterId,
+          trackId.namespace
+        )
+
+        val deployError = KubernetesDeploymentHelper.isDeploymentError(
           trackId.namespace,
-          trackId.clusterId)
-        val isConnection = 
KubernetesDeploymentHelper.isTheK8sConnectionNormal()
+          trackId.clusterId
+        )
 
-        if (isDeployExists) {
-          if (!deployStateOfTheError) {
+        val isConnection = KubernetesDeploymentHelper.checkConnection()
+
+        if (deployExists) {
+          if (!deployError) {
             logger.info("Task Enter the initialization process.")
             FlinkJobState.K8S_INITIALIZING
           } else if (isConnection) {
@@ -307,7 +312,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
               trackId.namespace,
               trackId.clusterId,
               trackId.jobId)
-            KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, 
trackId.clusterId)
             FlinkJobState.FAILED
           } else {
             inferSilentOrLostFromPreCache(latest)
@@ -318,8 +322,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
         } else {
           inferSilentOrLostFromPreCache(latest)
         }
-
-      }
     }
 
     val jobStatusCV = JobStatusCV(

Reply via email to