This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4280291072 [Bugfix][zeta] Fixed the issue of duplicated metrics caused 
by job fault tolerance or restore. (#5214)
4280291072 is described below

commit 4280291072289cd5dfc6ef87cc07433af273811b
Author: ic4y <[email protected]>
AuthorDate: Thu Aug 10 11:43:49 2023 +0800

    [Bugfix][zeta] Fixed the issue of duplicated metrics caused by job fault 
tolerance or restore. (#5214)
---
 .../seatunnel/api/common/metrics/JobMetrics.java   | 46 +++++++++++++++++++++-
 1 file changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
index 0149ad2649..d39e8b9664 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -26,9 +26,11 @@ import lombok.Getter;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -72,6 +74,10 @@ public final class JobMetrics implements Serializable {
         }
         Map<String, List<Measurement>> metricsMap = new HashMap<>();
         metrics.forEach((key, value) -> metricsMap.put(key, new 
ArrayList<>(value)));
+        //// Because if a job is restarted, the running node might change, so 
we need to remove the
+        // node information.
+        Set<String> keysToExclude =
+                new HashSet<>(Arrays.asList(MetricTags.MEMBER, 
MetricTags.ADDRESS));
         jobMetrics.metrics.forEach(
                 (key, value) ->
                         metricsMap.merge(
@@ -82,7 +88,11 @@ public final class JobMetrics implements Serializable {
                                     for (Measurement m1 : v1) {
                                         if (v2.stream()
                                                 .noneMatch(
-                                                        m2 -> 
m2.getTags().equals(m1.getTags()))) {
+                                                        m2 ->
+                                                                
areMapsEqualExcludingKeys(
+                                                                        
m2.getTags(),
+                                                                        
m1.getTags(),
+                                                                        
keysToExclude))) {
                                             ms.add(m1);
                                         }
                                     }
@@ -91,6 +101,40 @@ public final class JobMetrics implements Serializable {
         return new JobMetrics(metricsMap);
     }
 
+    /**
+     * Compares two Map objects excluding certain keys.
+     *
+     * @param map1 the first map
+     * @param map2 the second map
+     * @param keysToExclude the keys to be excluded during comparison
+     * @return true if the maps are equal excluding the specific keys, false 
otherwise
+     */
+    public static boolean areMapsEqualExcludingKeys(
+            Map<String, String> map1, Map<String, String> map2, Set<String> 
keysToExclude) {
+        // Return false if either of the maps is null
+        if (map1 == null || map2 == null) {
+            return false;
+        }
+
+        // Return false if the sizes of the maps are different
+        if (map1.size() != map2.size()) {
+            return false;
+        }
+
+        // Create copies of the maps to avoid modifying the original maps
+        Map<String, String> map1Copy = new HashMap<>(map1);
+        Map<String, String> map2Copy = new HashMap<>(map2);
+
+        // Remove specific keys from the copies
+        for (String key : keysToExclude) {
+            map1Copy.remove(key);
+            map2Copy.remove(key);
+        }
+
+        // Return whether the copies are equal
+        return map1Copy.equals(map2Copy);
+    }
+
     /** Returns all metrics present. */
     public Set<String> metrics() {
         return Collections.unmodifiableSet(metrics.keySet());

Reply via email to