This is an automated email from the ASF dual-hosted git repository.

cancai pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new e3c6c46ad [Bug] Fix the issue where metrics do not update upon 
restart. (#4149)
e3c6c46ad is described below

commit e3c6c46adf58c461430130ae51f7560ba99d4dcd
Author: zhangyuhang <[email protected]>
AuthorDate: Thu Dec 19 18:50:41 2024 +0800

    [Bug] Fix the issue where metrics do not update upon restart. (#4149)
    
    Co-authored-by: yuhang2.zhang <[email protected]>
---
 .../streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 569c5a2fc..269ca6bc3 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -90,14 +90,14 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
           future.onComplete(_.getOrElse(None) match {
             case Some(metric) =>
               val clusterKey = id.toClusterKey
-              // update current flink cluster metrics on cache
-              watchController.flinkMetrics.put(clusterKey, metric)
               val isMetricChanged = {
                 val preMetric = watchController.flinkMetrics.get(clusterKey)
                 preMetric == null || !preMetric.equalsPayload(metric)
               }
               if (isMetricChanged) {
                 eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
+                // update current flink cluster metrics on cache
+                watchController.flinkMetrics.put(clusterKey, metric)
               }
             case _ =>
           })

Reply via email to