This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-deploy
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/k8s-deploy by this push:
     new 2c68e376a [Improve] k8s mode job state bug fixed.
2c68e376a is described below

commit 2c68e376a12e5705364041717dcd87bce4e4f3cf
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 14 23:14:55 2024 +0800

    [Improve] k8s mode job state bug fixed.
---
 .../console/core/entity/Application.java           |   4 +
 .../core/service/impl/ApplicationServiceImpl.java  |  26 ++---
 .../core/service/impl/FlinkClusterServiceImpl.java |  15 +--
 .../flink/kubernetes/model/JobStatusCV.scala       |  10 +-
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 107 +++++++++------------
 5 files changed, 80 insertions(+), 82 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4b03ee57b..f75bc8dbd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -483,6 +483,10 @@ public class Application implements Serializable {
     return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
   }
 
+  public boolean isKubernetesModeJob() {
+    return ExecutionMode.isKubernetesMode(this.getExecutionModeEnum());
+  }
+
   @JsonIgnore
   @SneakyThrows
   public MavenDependency getMavenDependency() {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 61bd08a90..b1c3ef872 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -157,7 +157,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.streampark.common.enums.StorageType.LFS;
 import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
-import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
 
 @Slf4j
 @Service
@@ -515,7 +514,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 record -> {
                   // status of flink job on kubernetes mode had been 
automatically persisted to db
                   // in time.
-                  if (isKubernetesApp(record)) {
+                  if (record.isKubernetesModeJob()) {
                     // set duration
                     String restUrl = 
flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
                     record.setFlinkRestUrl(restUrl);
@@ -1143,7 +1142,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     CompletableFuture<SubmitResponse> startFuture = 
startFutureMap.remove(app.getId());
     CompletableFuture<CancelResponse> cancelFuture = 
cancelFutureMap.remove(app.getId());
     Application application = this.baseMapper.getApp(app);
-    if (isKubernetesApp(application)) {
+    if (application.isKubernetesModeJob()) {
       KubernetesDeploymentHelper.watchPodTerminatedLog(
           application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
     }
@@ -1194,7 +1193,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       }
     }
     // add flink web url info for k8s-mode
-    if (isKubernetesApp(application)) {
+    if (application.isKubernetesModeJob()) {
       String restUrl = 
flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
       application.setFlinkRestUrl(restUrl);
 
@@ -1231,7 +1230,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   public boolean mapping(Application appParam) {
     boolean mapping = this.baseMapper.mapping(appParam);
     Application application = getById(appParam.getId());
-    if (isKubernetesApp(application)) {
+    if (application.isKubernetesModeJob()) {
       flinkK8sWatcher.doWatching(toTrackId(application));
     } else {
       FlinkRESTAPIWatcher.doWatching(application);
@@ -1329,7 +1328,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
-    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
+    TrackId trackId = application.isKubernetesModeJob() ? 
toTrackId(application) : null;
 
     cancelFuture.whenComplete(
         (cancelResponse, throwable) -> {
@@ -1353,7 +1352,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 savePointService.expire(application.getId());
               }
               // re-tracking flink job on kubernetes and logging exception
-              if (isKubernetesApp(application)) {
+              if (application.isKubernetesModeJob()) {
                 flinkK8sWatcher.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1379,7 +1378,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             savePointService.save(savePoint);
           }
 
-          if (isKubernetesApp(application)) {
+          if (application.isKubernetesModeJob()) {
             flinkK8sWatcher.unWatching(trackId);
           }
         });
@@ -1590,7 +1589,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             application.getK8sNamespace(),
             application.getK8sRestExposedTypeEnum());
 
-    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
     CompletableFuture<SubmitResponse> future =
         CompletableFuture.supplyAsync(
             () -> FlinkClientHandler.submit(submitRequest), executorService);
@@ -1616,7 +1614,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               app.setState(FlinkAppState.FAILED.getValue());
               app.setOptionState(OptionState.NONE.getValue());
               updateById(app);
-              if (isKubernetesApp(app)) {
+              if (app.isKubernetesModeJob()) {
+                TrackId trackId = toTrackId(application);
                 flinkK8sWatcher.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(appParam.getId());
@@ -1651,13 +1650,16 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           application.setEndTime(null);
 
           // if start completed, will be added task to tracking queue
-          if (isKubernetesApp(application)) {
+          if (application.isKubernetesModeJob()) {
             log.info(
                 "start job {} on {} success, doWatching...",
                 application.getJobName(),
                 application.getExecutionModeEnum().getName());
             application.setRelease(ReleaseState.DONE.get());
+
+            TrackId trackId = toTrackId(application);
             flinkK8sWatcher.doWatching(trackId);
+
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
               String domainName = settingService.getIngressModeDefault();
               if (StringUtils.isNotBlank(domainName)) {
@@ -1754,7 +1756,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     updateById(application);
     savePointService.expire(application.getId());
     // re-tracking flink job on kubernetes and logging exception
-    if (isKubernetesApp(application)) {
+    if (application.isKubernetesModeJob()) {
       TrackId id = toTrackId(application);
       flinkK8sWatcher.doWatching(id);
     } else {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a207d28ff..d309eb5d3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -151,14 +151,17 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   @Transactional(rollbackFor = {Exception.class})
   public void start(Long id) {
     FlinkCluster flinkCluster = getById(id);
-    ApiAlertException.throwIfTrue(
-        
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
-        "The same job name: "
-            + flinkCluster.getClusterName()
-            + " is already running in the yarn queue");
+    ApiAlertException.throwIfTrue(flinkCluster == null, "Invalid id, no 
related cluster found.");
+    ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+    if (executionModeEnum == ExecutionMode.YARN_SESSION) {
+      ApiAlertException.throwIfTrue(
+          
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
+          "The application name: "
+              + flinkCluster.getClusterName()
+              + " is already running in the yarn queue, please check!");
+    }
 
     try {
-      ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
       log.info("deploy cluster request: " + deployRequest);
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
index 7bd5bd52a..7b0e599fe 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
@@ -42,4 +42,12 @@ case class JobStatusCV(
     duration: Long = 0,
     taskTotal: Int = 0,
     pollEmitTime: Long,
-    pollAckTime: Long)
+    pollAckTime: Long) {
+
+  def eq(that: JobStatusCV): Boolean = {
+    that == null ||
+    that.jobState != this.jobState ||
+    that.jobId != this.jobId
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 36bac3d68..aaaaf9bea 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.kubernetes.watcher
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
-import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
+import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, 
FlinkK8sExecuteMode}
 import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, 
SESSION}
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
@@ -115,13 +115,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
             case Some(jobState) =>
               val trackId = id.copy(jobId = jobState.jobId)
               val latest: JobStatusCV = 
watchController.jobStatuses.get(trackId)
-
-              val eventChanged = latest == null ||
-                latest.jobState != jobState.jobState ||
-                latest.jobId != jobState.jobId
-
-              if (eventChanged) {
-                logInfo(s"eventChanged.....$trackId")
+              if (jobState.eq(latest)) {
                 // put job status to cache
                 watchController.jobStatuses.put(trackId, jobState)
                 // set jobId to trackIds
@@ -168,44 +162,36 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    * This method can be called directly from outside, without affecting the 
current cachePool
    * result.
    */
-  def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
+  def touchSessionJob(@Nonnull idArg: TrackId): Option[JobStatusCV] = {
     val pollEmitTime = System.currentTimeMillis
 
-    val id = TrackId.onSession(
-      trackId.namespace,
-      trackId.clusterId,
-      trackId.appId,
-      trackId.jobId,
-      trackId.groupId
-    )
+    val id = idArg.copy(executeMode = FlinkK8sExecuteMode.SESSION)
 
-    val rsMap = touchSessionAllJob(
-      trackId.namespace,
-      trackId.clusterId,
-      trackId.appId,
-      trackId.groupId
-    ).toMap
-
-    val jobState = rsMap.get(id).filter(_.jobState != 
FlinkJobState.SILENT).getOrElse {
-      val preCache = watchController.jobStatuses.get(id)
-      val state = inferSilentOrLostFromPreCache(preCache)
-      val nonFirstSilent =
-        state == FlinkJobState.SILENT && preCache != null && preCache.jobState 
== FlinkJobState.SILENT
-      if (nonFirstSilent) {
-        JobStatusCV(
-          jobState = state,
-          jobId = id.jobId,
-          pollEmitTime = preCache.pollEmitTime,
-          pollAckTime = preCache.pollAckTime)
-      } else {
-        JobStatusCV(
-          jobState = state,
-          jobId = id.jobId,
-          pollEmitTime = pollEmitTime,
-          pollAckTime = System.currentTimeMillis)
-      }
+    touchSessionAllJob(id).get(id).filter(_.jobState != FlinkJobState.SILENT) 
match {
+      case Some(state) => Option(state)
+      case _ =>
+        val preCache = watchController.jobStatuses.get(id)
+        val state = inferSilentOrLostFromPreCache(preCache)
+
+        val nonFirstSilent = state == FlinkJobState.SILENT &&
+          preCache != null &&
+          preCache.jobState == FlinkJobState.SILENT
+
+        val jobState = if (nonFirstSilent) {
+          JobStatusCV(
+            jobState = state,
+            jobId = id.jobId,
+            pollEmitTime = preCache.pollEmitTime,
+            pollAckTime = preCache.pollAckTime)
+        } else {
+          JobStatusCV(
+            jobState = state,
+            jobId = id.jobId,
+            pollEmitTime = pollEmitTime,
+            pollAckTime = System.currentTimeMillis)
+        }
+        Option(jobState)
     }
-    Some(jobState)
   }
 
   /**
@@ -215,28 +201,23 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    * This method can be called directly from outside, without affecting the 
current cachePool
    * result.
    */
-  private def touchSessionAllJob(
-      @Nonnull namespace: String,
-      @Nonnull clusterId: String,
-      @Nonnull appId: Long,
-      @Nonnull groupId: String): Array[(TrackId, JobStatusCV)] = {
-
-    lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
+  private def touchSessionAllJob(trackId: TrackId): Map[TrackId, JobStatusCV] 
= {
     val pollEmitTime = System.currentTimeMillis
-
-    val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId))
-      .getOrElse(return defaultResult)
-      .jobs
-
-    if (jobDetails.isEmpty) {
-      defaultResult
-    } else {
-      jobDetails.map {
-        d =>
-          val trackId = TrackId.onSession(namespace, clusterId, appId, d.jid, 
groupId)
-          val jobStatus = d.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis)
-          trackId -> jobStatus
-      }
+    val jobDetails = listJobsDetails(ClusterKey(SESSION, trackId.namespace, 
trackId.clusterId))
+    jobDetails match {
+      case Some(details) if details.jobs.nonEmpty =>
+        details.jobs.map {
+          d =>
+            val id = TrackId.onSession(
+              trackId.namespace,
+              trackId.clusterId,
+              trackId.appId,
+              d.jid,
+              trackId.groupId)
+            val jobStatus = d.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis)
+            id -> jobStatus
+        }.toMap
+      case None => Map.empty
     }
   }
 

Reply via email to