This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new 9609b33c9 [Improve] k8s app mode tracking job state bug fixed.
9609b33c9 is described below
commit 9609b33c92ea1e8564507969eaec6b68894f5fd8
Author: benjobs <[email protected]>
AuthorDate: Fri Apr 5 14:21:33 2024 +0800
[Improve] k8s app mode tracking job state bug fixed.
---
.../watcher/FlinkCheckpointWatcher.scala | 22 +++++---
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 18 +++---
.../kubernetes/watcher/FlinkK8sEventWatcher.scala | 2 +-
.../kubernetes/watcher/FlinkMetricsWatcher.scala | 65 +++++++++++-----------
.../flink/kubernetes/watcher/FlinkWatcher.scala | 6 ++
5 files changed, 61 insertions(+), 52 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 6335379e2..e3ca21f77 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods.parse
import javax.annotation.concurrent.ThreadSafe
import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
import scala.concurrent.duration.DurationLong
@@ -44,30 +44,34 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.def
extends Logger
with FlinkWatcher {
- private val trackTaskExecPool = Executors.newWorkStealingPool()
implicit private val trackTaskExecutor: ExecutionContextExecutorService =
- ExecutionContext.fromExecutorService(trackTaskExecPool)
+ ExecutionContext.fromExecutorService(watchExecutor)
- private val timerExec = Executors.newSingleThreadScheduledExecutor()
private var timerSchedule: ScheduledFuture[_] = _
/** start watcher process */
override def doStart(): Unit = {
- timerSchedule =
- timerExec.scheduleAtFixedRate(() => doWatch(), 0,
conf.requestIntervalSec, TimeUnit.SECONDS)
+ timerSchedule = watchExecutor.scheduleAtFixedRate(
+ () => doWatch(),
+ 0,
+ conf.requestIntervalSec,
+ TimeUnit.SECONDS)
logInfo("[flink-k8s] FlinkCheckpointWatcher started.")
}
/** stop watcher process */
override def doStop(): Unit = {
- timerSchedule.cancel(true)
+ if (!timerSchedule.isCancelled) {
+ timerSchedule.cancel(true)
+ }
logInfo("[flink-k8s] FlinkCheckpointWatcher stopped.")
}
/** closes resource, relinquishing any underlying resources. */
override def doClose(): Unit = {
- timerExec.shutdownNow()
- trackTaskExecutor.shutdownNow()
+ if (!timerSchedule.isCancelled) {
+ timerSchedule.cancel(true)
+ }
logInfo("[flink-k8s] FlinkCheckpointWatcher closed.")
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 72bb3c4be..632dc8b58 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -40,7 +40,7 @@ import javax.annotation.concurrent.ThreadSafe
import java.io.File
import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.JavaConversions._
import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
@@ -60,30 +60,32 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
extends Logger
with FlinkWatcher {
- private val trackTaskExecPool = Executors.newWorkStealingPool()
implicit private val trackTaskExecutor: ExecutionContextExecutorService =
- ExecutionContext.fromExecutorService(trackTaskExecPool)
+ ExecutionContext.fromExecutorService(watchExecutor)
- private val timerExec = Executors.newSingleThreadScheduledExecutor()
private var timerSchedule: ScheduledFuture[_] = _
/** stop watcher process */
override def doStart(): Unit = {
- timerSchedule =
- timerExec.scheduleAtFixedRate(() => doWatch(), 0,
conf.requestIntervalSec, TimeUnit.SECONDS)
+ timerSchedule = watchExecutor.scheduleAtFixedRate(
+ () => doWatch(),
+ 0,
+ conf.requestIntervalSec,
+ TimeUnit.SECONDS)
logInfo("[flink-k8s] FlinkJobStatusWatcher started.")
}
/** stop watcher process */
override def doStop(): Unit = {
// interrupt all running threads
- timerSchedule.cancel(true)
+ if (!timerSchedule.isCancelled) {
+ timerSchedule.cancel(true)
+ }
logInfo("[flink-k8s] FlinkJobStatusWatcher stopped.")
}
/** closes resource, relinquishing any underlying resources. */
override def doClose(): Unit = {
- timerExec.shutdownNow()
trackTaskExecutor.shutdownNow()
logInfo("[flink-k8s] FlinkJobStatusWatcher closed.")
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
index 7151f671c..080bb813b 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
@@ -27,7 +27,7 @@ import
org.apache.flink.kubernetes.kubeclient.resources.{CompatibleKubernetesWat
import javax.annotation.concurrent.ThreadSafe
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Try}
/**
* K8s Event Watcher for Flink Native-K8s Mode. Currently only
flink-native-application mode events
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index c8431a7dd..af42e1dd3 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -19,22 +19,20 @@ package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
import org.apache.streampark.flink.kubernetes.model.{ClusterKey,
FlinkMetricCV, TrackId}
import org.apache.flink.configuration.{JobManagerOptions, MemorySize,
TaskManagerOptions}
import org.apache.hc.client5.http.fluent.Request
-import org.apache.hc.core5.util.Timeout
import org.json4s.{DefaultFormats, JArray}
import org.json4s.jackson.JsonMethods.parse
import javax.annotation.concurrent.ThreadSafe
import java.nio.charset.StandardCharsets
-import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
-import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationLong
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
@@ -46,30 +44,31 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
extends Logger
with FlinkWatcher {
- private val trackTaskExecPool = Executors.newWorkStealingPool()
- implicit private val trackTaskExecutor: ExecutionContextExecutorService =
- ExecutionContext.fromExecutorService(trackTaskExecPool)
-
- private val timerExec = Executors.newSingleThreadScheduledExecutor()
private var timerSchedule: ScheduledFuture[_] = _
/** start watcher process */
override def doStart(): Unit = {
- timerSchedule =
- timerExec.scheduleAtFixedRate(() => doWatch(), 0,
conf.requestIntervalSec, TimeUnit.SECONDS)
+ timerSchedule = watchExecutor.scheduleAtFixedRate(
+ () => doWatch(),
+ 0,
+ conf.requestIntervalSec,
+ TimeUnit.SECONDS)
logInfo("[flink-k8s] FlinkMetricWatcher started.")
}
/** stop watcher process */
override def doStop(): Unit = {
- timerSchedule.cancel(true)
+ if (!timerSchedule.isCancelled) {
+ timerSchedule.cancel(true)
+ }
logInfo("[flink-k8s] FlinkMetricWatcher stopped.")
}
/** closes resource, relinquishing any underlying resources. */
override def doClose(): Unit = {
- timerExec.shutdownNow()
- trackTaskExecutor.shutdownNow()
+ if (!timerSchedule.isCancelled) {
+ timerSchedule.cancel(true)
+ }
logInfo("[flink-k8s] FlinkMetricWatcher closed.")
}
@@ -82,27 +81,25 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
)
// retrieve flink metrics in thread pool
val futures: Set[Future[Option[FlinkMetricCV]]] =
- trackIds
- .filter(_.executeMode == FlinkK8sExecuteMode.SESSION)
- .map(
- id => {
- val future = Future(collectMetrics(id))
- future.onComplete(_.getOrElse(None) match {
- case Some(metric) =>
- val clusterKey = id.toClusterKey
- // update current flink cluster metrics on cache
- watchController.flinkMetrics.put(clusterKey, metric)
- val isMetricChanged = {
- val preMetric = watchController.flinkMetrics.get(clusterKey)
- preMetric == null || !preMetric.equalsPayload(metric)
- }
- if (isMetricChanged) {
- eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
- }
- case _ =>
- })
- future
+ trackIds.map(
+ id => {
+ val future = Future(collectMetrics(id))
+ future.onComplete(_.getOrElse(None) match {
+ case Some(metric) =>
+ val clusterKey = id.toClusterKey
+ // update current flink cluster metrics on cache
+ watchController.flinkMetrics.put(clusterKey, metric)
+ val isMetricChanged = {
+ val preMetric = watchController.flinkMetrics.get(clusterKey)
+ preMetric == null || !preMetric.equalsPayload(metric)
+ }
+ if (isMetricChanged) {
+ eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
+ }
+ case _ =>
})
+ future
+ })
// blocking until all future are completed or timeout is reached
Try(Await.ready(Future.sequence(futures), conf.requestTimeoutSec
seconds)).failed.map {
_ =>
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index 272227dd7..fbedc5287 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes.watcher
+import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
import scala.language.implicitConversions
@@ -25,6 +26,10 @@ trait FlinkWatcher extends AutoCloseable {
private[this] val started: AtomicBoolean = new AtomicBoolean(false)
+ private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)
+
+ val watchExecutor = new ScheduledThreadPoolExecutor(CPU_NUM)
+
/**
* Start watcher process. This method should be a thread-safe implementation
of light locking and
* can be called idempotent.
@@ -50,6 +55,7 @@ trait FlinkWatcher extends AutoCloseable {
this.doStop()
}
doClose()
+ watchExecutor.shutdownNow()
}
/**