This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4d0f4b4dd Update FlinkJobStatusWatcher.scala (#1887)
4d0f4b4dd is described below

commit 4d0f4b4dd80644201126fa91147fe934ef6e4d24
Author: monster <[email protected]>
AuthorDate: Sun Oct 23 14:55:39 2022 +0800

    Update FlinkJobStatusWatcher.scala (#1887)
---
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 80 ++++++++++++----------
 1 file changed, 42 insertions(+), 38 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index f7649f0f9..50f6fd96d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -86,48 +86,52 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
    * single flink job status tracking task
    */
   override def doWatch(): Unit = {
-    // get all legal tracking ids
-    val trackIds = 
Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
-
-    // retrieve flink job status in thread pool
-    val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map { id =>
-
-      val future = Future {
-        id.executeMode match {
-          case SESSION => touchSessionJob(id)
-          case APPLICATION => touchApplicationJob(id)
+    this.synchronized{
+      logInfo("[FlinkJobStatusWatcher]: Status monitoring process begins - " + 
Thread.currentThread().getName)
+      // get all legal tracking ids
+      val trackIds = 
Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
+
+      // retrieve flink job status in thread pool
+      val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map { id =>
+
+        val future = Future {
+          id.executeMode match {
+            case SESSION => touchSessionJob(id)
+            case APPLICATION => touchApplicationJob(id)
+          }
         }
-      }
 
-      future onComplete (_.getOrElse(None) match {
-        case Some(jobState) =>
-          val trackId = id.copy(jobId = jobState.jobId)
-          val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
-          if (latest == null || latest.jobState != jobState.jobState || 
latest.jobId != jobState.jobId) {
-            // put job status to cache
-            trackController.jobStatuses.put(trackId, jobState)
-            // set jobId to trackIds
-            trackController.trackIds.update(trackId)
-            eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
-          }
-          if (FlinkJobState.isEndState(jobState.jobState)) {
-            // remove trackId from cache of job that needs to be untracked
-            trackController.unTracking(trackId)
-            if (trackId.executeMode == APPLICATION) {
-              trackController.endpoints.invalidate(trackId.toClusterKey)
+        future onComplete (_.getOrElse(None) match {
+          case Some(jobState) =>
+            val trackId = id.copy(jobId = jobState.jobId)
+            val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
+            if (latest == null || latest.jobState != jobState.jobState || 
latest.jobId != jobState.jobId) {
+              // put job status to cache
+              trackController.jobStatuses.put(trackId, jobState)
+              // set jobId to trackIds
+              trackController.trackIds.update(trackId)
+              eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
             }
-          }
-        case _ =>
-      })
-      future
-    }
+            if (FlinkJobState.isEndState(jobState.jobState)) {
+              // remove trackId from cache of job that needs to be untracked
+              trackController.unTracking(trackId)
+              if (trackId.executeMode == APPLICATION) {
+                trackController.endpoints.invalidate(trackId.toClusterKey)
+              }
+            }
+          case _ =>
+        })
+        future
+      }
 
-    // blocking until all future are completed or timeout is reached
-    Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec 
seconds))
-      .failed.map { _ =>
-      logInfo(s"[FlinkJobStatusWatcher] tracking flink job status on 
kubernetes mode timeout," +
-        s" limitSeconds=${conf.requestTimeoutSec}," +
-        s" trackIds=${trackIds.mkString(",")}")
+      // blocking until all future are completed or timeout is reached
+      Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec 
seconds))
+        .failed.map { _ =>
+        logInfo(s"[FlinkJobStatusWatcher] tracking flink job status on 
kubernetes mode timeout," +
+          s" limitSeconds=${conf.requestTimeoutSec}," +
+          s" trackIds=${trackIds.mkString(",")}")
+      }
+      logInfo("[FlinkJobStatusWatcher]: End of status monitoring process - " + 
Thread.currentThread().getName)
     }
   }
 

Reply via email to