This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 13ac2c660 [Bug] Fix the issue where metrics do not update upon
restart. (#4148)
13ac2c660 is described below
commit 13ac2c6603fdeb2fe774cfd28d52e8c31256a676
Author: zhangyuhang <[email protected]>
AuthorDate: Thu Dec 19 11:47:24 2024 +0800
[Bug] Fix the issue where metrics do not update upon restart. (#4148)
Co-authored-by: yuhang2.zhang <[email protected]>
---
.../streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 3311d67f0..2802d6d00 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -89,14 +89,14 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
future.onComplete(_.getOrElse(None) match {
case Some(metric) =>
val clusterKey = id.toClusterKey
- // update current flink cluster metrics on cache
- watchController.flinkMetrics.put(clusterKey, metric)
val isMetricChanged = {
val preMetric = watchController.flinkMetrics.get(clusterKey)
preMetric == null || !preMetric.equalsPayload(metric)
}
if (isMetricChanged) {
eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
+ // update current flink cluster metrics on cache
+ watchController.flinkMetrics.put(clusterKey, metric)
}
case _ =>
})