This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch release-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/release-2.1.5 by this push:
     new 1634ee5f1 [Improve] flink job-status  bug fixed.
1634ee5f1 is described below

commit 1634ee5f16ecfa959d32d576b6cb3b5c63dbefe4
Author: benjobs <[email protected]>
AuthorDate: Sat Dec 7 20:01:33 2024 +0800

    [Improve] flink job-status  bug fixed.
---
 .../helper/KubernetesDeploymentHelper.scala        |  8 ++--
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 49 ++++++++++++----------
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index e28922002..c359ac9a0 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -29,7 +29,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import java.io.File
 
 import scala.collection.JavaConversions._
-import scala.util.{Success, Try}
+import scala.util.{Failure, Success, Try}
 
 object KubernetesDeploymentHelper extends Logger {
 
@@ -91,12 +91,14 @@ object KubernetesDeploymentHelper extends Logger {
     deleteConfigMap(nameSpace, deploymentName)
   }
 
-  def checkConnection(): Boolean = {
+  def checkConnection(e: Throwable => Unit): Boolean = {
     Try(new DefaultKubernetesClient) match {
       case Success(client) =>
         client.close()
         true
-      case _ => false
+      case Failure(exception) =>
+        e(exception)
+        false
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index aa82962f3..2c1b8cf50 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,7 +17,6 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
-import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
 import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, 
FlinkK8sExecuteMode}
@@ -168,7 +167,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
    */
   def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
     touchSessionAllJob(trackId)
-      .find(id => id._1.jobId == trackId.jobId && id._2.jobState != 
FlinkJobState.SILENT)
+      .find(id => id._1.jobId == trackId.jobId && id._2.jobState != 
FlinkJobState.LOST)
       .map(_._2)
       .orElse(inferState(trackId))
   }
@@ -214,12 +213,11 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
 
   private[this] def updateState(trackId: TrackId, jobState: JobStatusCV): Unit 
= {
     val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
+    // put job status to cache
+    watchController.jobStatuses.put(trackId, jobState)
     if (jobState.diff(latest)) {
-      // put job status to cache
-      watchController.jobStatuses.put(trackId, jobState)
       // set jobId to trackIds
       watchController.trackIds.update(trackId)
-
       eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
     }
 
@@ -244,21 +242,23 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
     lazy val pollEmitTime = System.currentTimeMillis
     val preCache = watchController.jobStatuses.get(id)
     val state = inferFromPreCache(preCache)
-    val nonFirstSilent = state == FlinkJobState.SILENT &&
+
+    val firstSilent = state == FlinkJobState.SILENT &&
       preCache != null &&
-      preCache.jobState == FlinkJobState.SILENT
-    val jobState = if (nonFirstSilent) {
+      preCache.jobState != FlinkJobState.SILENT
+
+    val jobState = if (firstSilent) {
       JobStatusCV(
-        jobState = state,
+        jobState = FlinkJobState.LOST,
         jobId = id.jobId,
-        pollEmitTime = preCache.pollEmitTime,
-        pollAckTime = preCache.pollAckTime)
+        pollEmitTime = pollEmitTime,
+        pollAckTime = System.currentTimeMillis)
     } else {
       JobStatusCV(
-        jobState = state,
+        jobState = FlinkJobState.LOST,
         jobId = id.jobId,
-        pollEmitTime = pollEmitTime,
-        pollAckTime = System.currentTimeMillis)
+        pollEmitTime = preCache.pollEmitTime,
+        pollAckTime = preCache.pollAckTime)
     }
     Option(jobState)
   }
@@ -315,10 +315,16 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
     val jobState = trackId match {
       case id if watchController.canceling.has(id) =>
         logger.info(s"trackId ${trackId.toString} is canceling")
-        if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED
+        if (deployExists) {
+          FlinkJobState.CANCELLING
+        } else {
+          FlinkJobState.CANCELED
+        }
       case _ =>
-        val isConnection = KubernetesDeploymentHelper.checkConnection()
-
+        val isConnection = KubernetesDeploymentHelper.checkConnection(
+          e => {
+            logger.warn("connect Kubernetes client failed. ", e)
+          })
         if (deployExists) {
           if (isConnection) {
             logger.info("Enter the task starting process.")
@@ -339,14 +345,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
     }
 
     val jobStatusCV = JobStatusCV(
-      jobState = jobState,
+      jobState = if (jobState == FlinkJobState.SILENT) FlinkJobState.LOST else 
jobState,
       jobId = trackId.jobId,
       pollEmitTime = pollEmitTime,
-      pollAckTime = System.currentTimeMillis)
+      pollAckTime = System.currentTimeMillis
+    )
 
-    if (
+    val flag =
       jobState == FlinkJobState.SILENT && latest != null && latest.jobState == 
FlinkJobState.SILENT
-    ) {
+    if (flag) {
       Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = 
latest.pollAckTime))
     } else {
       Some(jobStatusCV)

Reply via email to