This is an automated email from the ASF dual-hosted git repository.

cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 13ac2c660 [Bug] Fix the issue where metrics do not update upon 
restart. (#4148)
13ac2c660 is described below

commit 13ac2c6603fdeb2fe774cfd28d52e8c31256a676
Author: zhangyuhang <[email protected]>
AuthorDate: Thu Dec 19 11:47:24 2024 +0800

    [Bug] Fix the issue where metrics do not update upon restart. (#4148)
    
    Co-authored-by: yuhang2.zhang <[email protected]>
---
 .../streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 3311d67f0..2802d6d00 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -89,14 +89,14 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
         future.onComplete(_.getOrElse(None) match {
           case Some(metric) =>
             val clusterKey = id.toClusterKey
-            // update current flink cluster metrics on cache
-            watchController.flinkMetrics.put(clusterKey, metric)
             val isMetricChanged = {
               val preMetric = watchController.flinkMetrics.get(clusterKey)
               preMetric == null || !preMetric.equalsPayload(metric)
             }
             if (isMetricChanged) {
               eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
+              // update current flink cluster metrics on cache
+              watchController.flinkMetrics.put(clusterKey, metric)
             }
           case _ =>
         })

Reply via email to