This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 357c8c87e [Bug] fix job silent status can't convert to lost status
bugs (#4300)
357c8c87e is described below
commit 357c8c87e3caee5c16aaa1c5fc67c7c05232158a
Author: Lei <[email protected]>
AuthorDate: Sun Oct 19 08:38:42 2025 +0800
[Bug] fix job silent status can't convert to lost status bugs (#4300)
* fix job silent status can't convert to lost status bugs
* fix job silent status can't convert to lost status bugs
---
.../org/apache/streampark/common/conf/K8sFlinkConfig.scala | 7 +++++++
.../flink/kubernetes/DefaultFlinkK8sWatcher.scala | 2 +-
.../flink/kubernetes/FlinkK8sWatchController.scala | 10 +++++-----
.../apache/streampark/flink/kubernetes/TrackConfig.scala | 14 ++++++++++----
.../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala | 6 +-----
5 files changed, 24 insertions(+), 15 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index f2ae17852..79173ac21 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -28,6 +28,13 @@ object K8sFlinkConfig {
classType = classOf[java.lang.Long],
description = "run timeout seconds of single flink-k8s metrics tracking
task")
+ @deprecated
+ val jobStatusTrackCacheTimeoutSec: InternalOption = InternalOption(
+ key = "streampark.flink-k8s.tracking.cache-timeout-sec.job-status",
+ defaultValue = 300,
+ classType = classOf[java.lang.Integer],
+ description = "status cache timeout seconds of single flink-k8s job status
tracking task")
+
@deprecated
val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
key =
"streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index 45155f0a0..ae7ab5e83 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -33,7 +33,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig =
FlinkTrackConfig.defaultCo
// cache pool for storage tracking result
implicit val watchController: FlinkK8sWatchController =
- new FlinkK8sWatchController()
+ new FlinkK8sWatchController(conf.jobStatusWatcherConf)
// eventBus for change event
implicit lazy val eventBus: ChangeEventBus = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index f96e9e5f6..01e67def0 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -27,7 +27,7 @@ import java.util.Objects
import java.util.concurrent.TimeUnit
/** Tracking info cache pool on flink kubernetes mode. */
-class FlinkK8sWatchController extends Logger with AutoCloseable {
+class FlinkK8sWatchController(conf: JobStatusWatcherConfig =
JobStatusWatcherConfig.defaultConf) extends Logger with AutoCloseable {
// cache for tracking identifiers
lazy val trackIds: TrackIdCache = TrackIdCache.build()
@@ -38,7 +38,7 @@ class FlinkK8sWatchController extends Logger with
AutoCloseable {
lazy val endpoints: EndpointCache = EndpointCache.build()
// cache for tracking flink job status
- lazy val jobStatuses: JobStatusCache = JobStatusCache.build()
+ lazy val jobStatuses: JobStatusCache =
JobStatusCache.build(conf.jobStatusCacheTimeOutSec)
// cache for tracking kubernetes events with Deployment kind
lazy val k8sDeploymentEvents: K8sDeploymentEventCache =
@@ -156,10 +156,10 @@ object TrackIdCache {
}
}
-class JobStatusCache {
+class JobStatusCache(timeout: Int) {
private[this] lazy val cache: Cache[CacheKey, JobStatusCV] =
- Caffeine.newBuilder.expireAfterWrite(20, TimeUnit.SECONDS).build()
+ Caffeine.newBuilder.expireAfterWrite(timeout, TimeUnit.SECONDS).build()
def putAll(kvs: Map[TrackId, JobStatusCV]): Unit =
cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2)))
@@ -183,7 +183,7 @@ class JobStatusCache {
object JobStatusCache {
- def build(): JobStatusCache = new JobStatusCache()
+ def build(timeout: Int): JobStatusCache = new JobStatusCache(timeout)
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
index d0f0f388c..2ca25b973 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala
@@ -48,11 +48,14 @@ case class MetricWatcherConfig(requestTimeoutSec: Long,
requestIntervalSec: Long
* interval seconds between two single tracking task
* @param silentStateJobKeepTrackingSec
* retained tracking time for SILENT state flink tasks
+ * @param jobStatusCacheTimeOutSec
+ * job status cache time out of single tracking task, must bigger than
silentStateJobKeepTrackingSec
*/
case class JobStatusWatcherConfig(
requestTimeoutSec: Long,
requestIntervalSec: Long,
- silentStateJobKeepTrackingSec: Int)
+ silentStateJobKeepTrackingSec: Int,
+ jobStatusCacheTimeOutSec: Int)
object FlinkTrackConfig {
def defaultConf: FlinkTrackConfig =
@@ -66,7 +69,8 @@ object FlinkTrackConfig {
JobStatusWatcherConfig(
InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackTaskTimeoutSec),
InternalConfigHolder.get(K8sFlinkConfig.jobStatueTrackTaskIntervalSec),
- InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec)),
+ InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec),
+ InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackCacheTimeoutSec)),
MetricWatcherConfig(
InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskTimeoutSec),
InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskIntervalSec)))
@@ -77,12 +81,14 @@ object JobStatusWatcherConfig {
def defaultConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
requestTimeoutSec = 120,
requestIntervalSec = 5,
- silentStateJobKeepTrackingSec = 60)
+ silentStateJobKeepTrackingSec = 60,
+ jobStatusCacheTimeOutSec = 300)
def debugConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
requestTimeoutSec = 120,
requestIntervalSec = 2,
- silentStateJobKeepTrackingSec = 5)
+ silentStateJobKeepTrackingSec = 5,
+ jobStatusCacheTimeOutSec = 30)
}
object MetricWatcherConfig {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index c3cfa9cb4..3f9c575b7 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -128,11 +128,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
case _ =>
touchSessionJob(trackId) match {
case Some(state) =>
- if (FlinkJobState.isEndState(state.jobState)) {
- // can't find that job in the k8s cluster.
- watchController.unWatching(trackId)
- }
- eventBus.postSync(FlinkJobStatusChangeEvent(trackId,
state))
+ updateState(trackId, state)
case _ =>
}
}