This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4d0f4b4dd Update FlinkJobStatusWatcher.scala (#1887)
4d0f4b4dd is described below
commit 4d0f4b4dd80644201126fa91147fe934ef6e4d24
Author: monster <[email protected]>
AuthorDate: Sun Oct 23 14:55:39 2022 +0800
Update FlinkJobStatusWatcher.scala (#1887)
---
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 80 ++++++++++++----------
1 file changed, 42 insertions(+), 38 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index f7649f0f9..50f6fd96d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -86,48 +86,52 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
* single flink job status tracking task
*/
override def doWatch(): Unit = {
- // get all legal tracking ids
- val trackIds =
Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
-
- // retrieve flink job status in thread pool
- val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map { id =>
-
- val future = Future {
- id.executeMode match {
- case SESSION => touchSessionJob(id)
- case APPLICATION => touchApplicationJob(id)
+ this.synchronized{
+ logInfo("[FlinkJobStatusWatcher]: Status monitoring process begins - " +
Thread.currentThread().getName)
+ // get all legal tracking ids
+ val trackIds =
Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
+
+ // retrieve flink job status in thread pool
+ val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map { id =>
+
+ val future = Future {
+ id.executeMode match {
+ case SESSION => touchSessionJob(id)
+ case APPLICATION => touchApplicationJob(id)
+ }
}
- }
- future onComplete (_.getOrElse(None) match {
- case Some(jobState) =>
- val trackId = id.copy(jobId = jobState.jobId)
- val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
- if (latest == null || latest.jobState != jobState.jobState ||
latest.jobId != jobState.jobId) {
- // put job status to cache
- trackController.jobStatuses.put(trackId, jobState)
- // set jobId to trackIds
- trackController.trackIds.update(trackId)
- eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
- }
- if (FlinkJobState.isEndState(jobState.jobState)) {
- // remove trackId from cache of job that needs to be untracked
- trackController.unTracking(trackId)
- if (trackId.executeMode == APPLICATION) {
- trackController.endpoints.invalidate(trackId.toClusterKey)
+ future onComplete (_.getOrElse(None) match {
+ case Some(jobState) =>
+ val trackId = id.copy(jobId = jobState.jobId)
+ val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
+ if (latest == null || latest.jobState != jobState.jobState ||
latest.jobId != jobState.jobId) {
+ // put job status to cache
+ trackController.jobStatuses.put(trackId, jobState)
+ // set jobId to trackIds
+ trackController.trackIds.update(trackId)
+ eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}
- }
- case _ =>
- })
- future
- }
+ if (FlinkJobState.isEndState(jobState.jobState)) {
+ // remove trackId from cache of job that needs to be untracked
+ trackController.unTracking(trackId)
+ if (trackId.executeMode == APPLICATION) {
+ trackController.endpoints.invalidate(trackId.toClusterKey)
+ }
+ }
+ case _ =>
+ })
+ future
+ }
- // blocking until all future are completed or timeout is reached
- Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec
seconds))
- .failed.map { _ =>
- logInfo(s"[FlinkJobStatusWatcher] tracking flink job status on
kubernetes mode timeout," +
- s" limitSeconds=${conf.requestTimeoutSec}," +
- s" trackIds=${trackIds.mkString(",")}")
+ // blocking until all future are completed or timeout is reached
+ Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec
seconds))
+ .failed.map { _ =>
+ logInfo(s"[FlinkJobStatusWatcher] tracking flink job status on
kubernetes mode timeout," +
+ s" limitSeconds=${conf.requestTimeoutSec}," +
+ s" trackIds=${trackIds.mkString(",")}")
+ }
+ logInfo("[FlinkJobStatusWatcher]: End of status monitoring process - " +
Thread.currentThread().getName)
}
}