This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new 729300e69 [Improve] flink metrics watch minor improve
729300e69 is described below
commit 729300e690bf3b4ea7a6069136fdfeca65be11ae
Author: benjobs <[email protected]>
AuthorDate: Fri Apr 5 14:33:33 2024 +0800
[Improve] flink metrics watch minor improve
---
.../streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index af42e1dd3..7201dec13 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -32,7 +32,7 @@ import javax.annotation.concurrent.ThreadSafe
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ScheduledFuture, TimeUnit}
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
import scala.concurrent.duration.DurationLong
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
@@ -44,6 +44,9 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
extends Logger
with FlinkWatcher {
+ implicit private val trackTaskExecutor: ExecutionContextExecutorService =
+ ExecutionContext.fromExecutorService(watchExecutor)
+
private var timerSchedule: ScheduledFuture[_] = _
/** start watcher process */