This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 357c8c87e [Bug] fix job silent status can't convert to lost status 
bugs (#4300)
357c8c87e is described below

commit 357c8c87e3caee5c16aaa1c5fc67c7c05232158a
Author: Lei <[email protected]>
AuthorDate: Sun Oct 19 08:38:42 2025 +0800

    [Bug] fix job silent status can't convert to lost status bugs (#4300)
    
    * fix job silent status can't convert to lost status bugs
    
    * fix job silent status can't convert to lost status bugs
---
 .../org/apache/streampark/common/conf/K8sFlinkConfig.scala |  7 +++++++
 .../flink/kubernetes/DefaultFlinkK8sWatcher.scala          |  2 +-
 .../flink/kubernetes/FlinkK8sWatchController.scala         | 10 +++++-----
 .../apache/streampark/flink/kubernetes/TrackConfig.scala   | 14 ++++++++++----
 .../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala   |  6 +-----
 5 files changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index f2ae17852..79173ac21 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -28,6 +28,13 @@ object K8sFlinkConfig {
     classType = classOf[java.lang.Long],
     description = "run timeout seconds of single flink-k8s metrics tracking 
task")
 
+  @deprecated
+  val jobStatusTrackCacheTimeoutSec: InternalOption = InternalOption(
+    key = "streampark.flink-k8s.tracking.cache-timeout-sec.job-status",
+    defaultValue = 300,
+    classType = classOf[java.lang.Integer],
+    description = "status cache timeout seconds of single flink-k8s job status 
tracking task")
+
   @deprecated
   val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
     key = 
"streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index 45155f0a0..ae7ab5e83 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -33,7 +33,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = 
FlinkTrackConfig.defaultCo
 
   // cache pool for storage tracking result
   implicit val watchController: FlinkK8sWatchController =
-    new FlinkK8sWatchController()
+    new FlinkK8sWatchController(conf.jobStatusWatcherConf)
 
   // eventBus for change event
   implicit lazy val eventBus: ChangeEventBus = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index f96e9e5f6..01e67def0 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -27,7 +27,7 @@ import java.util.Objects
 import java.util.concurrent.TimeUnit
 
 /** Tracking info cache pool on flink kubernetes mode. */
-class FlinkK8sWatchController extends Logger with AutoCloseable {
+class FlinkK8sWatchController(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfig.defaultConf) extends Logger with AutoCloseable {
 
   // cache for tracking identifiers
   lazy val trackIds: TrackIdCache = TrackIdCache.build()
@@ -38,7 +38,7 @@ class FlinkK8sWatchController extends Logger with 
AutoCloseable {
   lazy val endpoints: EndpointCache = EndpointCache.build()
 
   // cache for tracking flink job status
-  lazy val jobStatuses: JobStatusCache = JobStatusCache.build()
+  lazy val jobStatuses: JobStatusCache = 
JobStatusCache.build(conf.jobStatusCacheTimeOutSec)
 
   // cache for tracking kubernetes events with Deployment kind
   lazy val k8sDeploymentEvents: K8sDeploymentEventCache =
@@ -156,10 +156,10 @@ object TrackIdCache {
   }
 }
 
-class JobStatusCache {
+class JobStatusCache(timeout: Int) {
 
   private[this] lazy val cache: Cache[CacheKey, JobStatusCV] =
-    Caffeine.newBuilder.expireAfterWrite(20, TimeUnit.SECONDS).build()
+    Caffeine.newBuilder.expireAfterWrite(timeout, TimeUnit.SECONDS).build()
 
   def putAll(kvs: Map[TrackId, JobStatusCV]): Unit =
     cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2)))
@@ -183,7 +183,7 @@ class JobStatusCache {
 
 object JobStatusCache {
 
-  def build(): JobStatusCache = new JobStatusCache()
+  def build(timeout: Int): JobStatusCache = new JobStatusCache(timeout)
 
 }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
index d0f0f388c..2ca25b973 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
@@ -48,11 +48,14 @@ case class MetricWatcherConfig(requestTimeoutSec: Long, 
requestIntervalSec: Long
  *   interval seconds between two single tracking task
  * @param silentStateJobKeepTrackingSec
  *   retained tracking time for SILENT state flink tasks
+ * @param jobStatusCacheTimeOutSec
+ *   job status cache time out of single tracking task, must bigger than 
silentStateJobKeepTrackingSec
  */
 case class JobStatusWatcherConfig(
     requestTimeoutSec: Long,
     requestIntervalSec: Long,
-    silentStateJobKeepTrackingSec: Int)
+    silentStateJobKeepTrackingSec: Int,
+    jobStatusCacheTimeOutSec: Int)
 
 object FlinkTrackConfig {
   def defaultConf: FlinkTrackConfig =
@@ -66,7 +69,8 @@ object FlinkTrackConfig {
     JobStatusWatcherConfig(
       InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackTaskTimeoutSec),
       InternalConfigHolder.get(K8sFlinkConfig.jobStatueTrackTaskIntervalSec),
-      InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec)),
+      InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec),
+      InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackCacheTimeoutSec)),
     MetricWatcherConfig(
       InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskTimeoutSec),
       InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskIntervalSec)))
@@ -77,12 +81,14 @@ object JobStatusWatcherConfig {
   def defaultConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
     requestTimeoutSec = 120,
     requestIntervalSec = 5,
-    silentStateJobKeepTrackingSec = 60)
+    silentStateJobKeepTrackingSec = 60,
+    jobStatusCacheTimeOutSec = 300)
 
   def debugConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
     requestTimeoutSec = 120,
     requestIntervalSec = 2,
-    silentStateJobKeepTrackingSec = 5)
+    silentStateJobKeepTrackingSec = 5,
+    jobStatusCacheTimeOutSec = 30)
 }
 
 object MetricWatcherConfig {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index c3cfa9cb4..3f9c575b7 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -128,11 +128,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
                 case _ =>
                   touchSessionJob(trackId) match {
                     case Some(state) =>
-                      if (FlinkJobState.isEndState(state.jobState)) {
-                        // can't find that job in the k8s cluster.
-                        watchController.unWatching(trackId)
-                      }
-                      eventBus.postSync(FlinkJobStatusChangeEvent(trackId, 
state))
+                      updateState(trackId, state)
                     case _ =>
                   }
               }

Reply via email to