This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new 9609b33c9 [Improve] k8s app mode tracking job state bug fixed.
9609b33c9 is described below

commit 9609b33c92ea1e8564507969eaec6b68894f5fd8
Author: benjobs <[email protected]>
AuthorDate: Fri Apr 5 14:21:33 2024 +0800

    [Improve] k8s app mode tracking job state bug fixed.
---
 .../watcher/FlinkCheckpointWatcher.scala           | 22 +++++---
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 18 +++---
 .../kubernetes/watcher/FlinkK8sEventWatcher.scala  |  2 +-
 .../kubernetes/watcher/FlinkMetricsWatcher.scala   | 65 +++++++++++-----------
 .../flink/kubernetes/watcher/FlinkWatcher.scala    |  6 ++
 5 files changed, 61 insertions(+), 52 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 6335379e2..e3ca21f77 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods.parse
 import javax.annotation.concurrent.ThreadSafe
 
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
 
 import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
 import scala.concurrent.duration.DurationLong
@@ -44,30 +44,34 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.def
   extends Logger
   with FlinkWatcher {
 
-  private val trackTaskExecPool = Executors.newWorkStealingPool()
   implicit private val trackTaskExecutor: ExecutionContextExecutorService =
-    ExecutionContext.fromExecutorService(trackTaskExecPool)
+    ExecutionContext.fromExecutorService(watchExecutor)
 
-  private val timerExec = Executors.newSingleThreadScheduledExecutor()
   private var timerSchedule: ScheduledFuture[_] = _
 
   /** start watcher process */
   override def doStart(): Unit = {
-    timerSchedule =
-      timerExec.scheduleAtFixedRate(() => doWatch(), 0, 
conf.requestIntervalSec, TimeUnit.SECONDS)
+    timerSchedule = watchExecutor.scheduleAtFixedRate(
+      () => doWatch(),
+      0,
+      conf.requestIntervalSec,
+      TimeUnit.SECONDS)
     logInfo("[flink-k8s] FlinkCheckpointWatcher started.")
   }
 
   /** stop watcher process */
   override def doStop(): Unit = {
-    timerSchedule.cancel(true)
+    if (!timerSchedule.isCancelled) {
+      timerSchedule.cancel(true)
+    }
     logInfo("[flink-k8s] FlinkCheckpointWatcher stopped.")
   }
 
   /** closes resource, relinquishing any underlying resources. */
   override def doClose(): Unit = {
-    timerExec.shutdownNow()
-    trackTaskExecutor.shutdownNow()
+    if (!timerSchedule.isCancelled) {
+      timerSchedule.cancel(true)
+    }
     logInfo("[flink-k8s] FlinkCheckpointWatcher closed.")
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 72bb3c4be..632dc8b58 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -40,7 +40,7 @@ import javax.annotation.concurrent.ThreadSafe
 
 import java.io.File
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
 
 import scala.collection.JavaConversions._
 import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
@@ -60,30 +60,32 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
   extends Logger
   with FlinkWatcher {
 
-  private val trackTaskExecPool = Executors.newWorkStealingPool()
   implicit private val trackTaskExecutor: ExecutionContextExecutorService =
-    ExecutionContext.fromExecutorService(trackTaskExecPool)
+    ExecutionContext.fromExecutorService(watchExecutor)
 
-  private val timerExec = Executors.newSingleThreadScheduledExecutor()
   private var timerSchedule: ScheduledFuture[_] = _
 
   /** stop watcher process */
   override def doStart(): Unit = {
-    timerSchedule =
-      timerExec.scheduleAtFixedRate(() => doWatch(), 0, 
conf.requestIntervalSec, TimeUnit.SECONDS)
+    timerSchedule = watchExecutor.scheduleAtFixedRate(
+      () => doWatch(),
+      0,
+      conf.requestIntervalSec,
+      TimeUnit.SECONDS)
     logInfo("[flink-k8s] FlinkJobStatusWatcher started.")
   }
 
   /** stop watcher process */
   override def doStop(): Unit = {
     // interrupt all running threads
-    timerSchedule.cancel(true)
+    if (!timerSchedule.isCancelled) {
+      timerSchedule.cancel(true)
+    }
     logInfo("[flink-k8s] FlinkJobStatusWatcher stopped.")
   }
 
   /** closes resource, relinquishing any underlying resources. */
   override def doClose(): Unit = {
-    timerExec.shutdownNow()
     trackTaskExecutor.shutdownNow()
     logInfo("[flink-k8s] FlinkJobStatusWatcher closed.")
   }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
index 7151f671c..080bb813b 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.kubernetes.kubeclient.resources.{CompatibleKubernetesWat
 
 import javax.annotation.concurrent.ThreadSafe
 
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Try}
 
 /**
  * K8s Event Watcher for Flink Native-K8s Mode. Currently only 
flink-native-application mode events
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index c8431a7dd..af42e1dd3 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -19,22 +19,20 @@ package org.apache.streampark.flink.kubernetes.watcher
 
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 import 
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
 import org.apache.streampark.flink.kubernetes.model.{ClusterKey, 
FlinkMetricCV, TrackId}
 
 import org.apache.flink.configuration.{JobManagerOptions, MemorySize, 
TaskManagerOptions}
 import org.apache.hc.client5.http.fluent.Request
-import org.apache.hc.core5.util.Timeout
 import org.json4s.{DefaultFormats, JArray}
 import org.json4s.jackson.JsonMethods.parse
 
 import javax.annotation.concurrent.ThreadSafe
 
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
 
-import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.DurationLong
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
@@ -46,30 +44,31 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
   extends Logger
   with FlinkWatcher {
 
-  private val trackTaskExecPool = Executors.newWorkStealingPool()
-  implicit private val trackTaskExecutor: ExecutionContextExecutorService =
-    ExecutionContext.fromExecutorService(trackTaskExecPool)
-
-  private val timerExec = Executors.newSingleThreadScheduledExecutor()
   private var timerSchedule: ScheduledFuture[_] = _
 
   /** start watcher process */
   override def doStart(): Unit = {
-    timerSchedule =
-      timerExec.scheduleAtFixedRate(() => doWatch(), 0, 
conf.requestIntervalSec, TimeUnit.SECONDS)
+    timerSchedule = watchExecutor.scheduleAtFixedRate(
+      () => doWatch(),
+      0,
+      conf.requestIntervalSec,
+      TimeUnit.SECONDS)
     logInfo("[flink-k8s] FlinkMetricWatcher started.")
   }
 
   /** stop watcher process */
   override def doStop(): Unit = {
-    timerSchedule.cancel(true)
+    if (!timerSchedule.isCancelled) {
+      timerSchedule.cancel(true)
+    }
     logInfo("[flink-k8s] FlinkMetricWatcher stopped.")
   }
 
   /** closes resource, relinquishing any underlying resources. */
   override def doClose(): Unit = {
-    timerExec.shutdownNow()
-    trackTaskExecutor.shutdownNow()
+    if (!timerSchedule.isCancelled) {
+      timerSchedule.cancel(true)
+    }
     logInfo("[flink-k8s] FlinkMetricWatcher closed.")
   }
 
@@ -82,27 +81,25 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
       )
     // retrieve flink metrics in thread pool
     val futures: Set[Future[Option[FlinkMetricCV]]] =
-      trackIds
-        .filter(_.executeMode == FlinkK8sExecuteMode.SESSION)
-        .map(
-          id => {
-            val future = Future(collectMetrics(id))
-            future.onComplete(_.getOrElse(None) match {
-              case Some(metric) =>
-                val clusterKey = id.toClusterKey
-                // update current flink cluster metrics on cache
-                watchController.flinkMetrics.put(clusterKey, metric)
-                val isMetricChanged = {
-                  val preMetric = watchController.flinkMetrics.get(clusterKey)
-                  preMetric == null || !preMetric.equalsPayload(metric)
-                }
-                if (isMetricChanged) {
-                  eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
-                }
-              case _ =>
-            })
-            future
+      trackIds.map(
+        id => {
+          val future = Future(collectMetrics(id))
+          future.onComplete(_.getOrElse(None) match {
+            case Some(metric) =>
+              val clusterKey = id.toClusterKey
+              // update current flink cluster metrics on cache
+              watchController.flinkMetrics.put(clusterKey, metric)
+              val isMetricChanged = {
+                val preMetric = watchController.flinkMetrics.get(clusterKey)
+                preMetric == null || !preMetric.equalsPayload(metric)
+              }
+              if (isMetricChanged) {
+                eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
+              }
+            case _ =>
           })
+          future
+        })
     // blocking until all future are completed or timeout is reached
     Try(Await.ready(Future.sequence(futures), conf.requestTimeoutSec 
seconds)).failed.map {
       _ =>
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index 272227dd7..fbedc5287 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
+import java.util.concurrent.ScheduledThreadPoolExecutor
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.language.implicitConversions
@@ -25,6 +26,10 @@ trait FlinkWatcher extends AutoCloseable {
 
   private[this] val started: AtomicBoolean = new AtomicBoolean(false)
 
+  private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)
+
+  val watchExecutor = new ScheduledThreadPoolExecutor(CPU_NUM)
+
   /**
    * Start watcher process. This method should be a thread-safe implementation 
of light locking and
    * can be called idempotent.
@@ -50,6 +55,7 @@ trait FlinkWatcher extends AutoCloseable {
       this.doStop()
     }
     doClose()
+    watchExecutor.shutdownNow()
   }
 
   /**

Reply via email to