This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4280291072 [Bugfix][zeta] Fixed the issue of duplicated metrics caused
by job fault tolerance or restore. (#5214)
4280291072 is described below
commit 4280291072289cd5dfc6ef87cc07433af273811b
Author: ic4y <[email protected]>
AuthorDate: Thu Aug 10 11:43:49 2023 +0800
[Bugfix][zeta] Fixed the issue of duplicated metrics caused by job fault
tolerance or restore. (#5214)
---
.../seatunnel/api/common/metrics/JobMetrics.java | 46 +++++++++++++++++++++-
1 file changed, 45 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
index 0149ad2649..d39e8b9664 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -26,9 +26,11 @@ import lombok.Getter;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -72,6 +74,10 @@ public final class JobMetrics implements Serializable {
}
Map<String, List<Measurement>> metricsMap = new HashMap<>();
metrics.forEach((key, value) -> metricsMap.put(key, new
ArrayList<>(value)));
+ //// Because if a job is restarted, the running node might change, so
we need to remove the
+ // node information.
+ Set<String> keysToExclude =
+ new HashSet<>(Arrays.asList(MetricTags.MEMBER,
MetricTags.ADDRESS));
jobMetrics.metrics.forEach(
(key, value) ->
metricsMap.merge(
@@ -82,7 +88,11 @@ public final class JobMetrics implements Serializable {
for (Measurement m1 : v1) {
if (v2.stream()
.noneMatch(
- m2 ->
m2.getTags().equals(m1.getTags()))) {
+ m2 ->
+
areMapsEqualExcludingKeys(
+
m2.getTags(),
+
m1.getTags(),
+
keysToExclude))) {
ms.add(m1);
}
}
@@ -91,6 +101,40 @@ public final class JobMetrics implements Serializable {
return new JobMetrics(metricsMap);
}
+ /**
+ * Compares two Map objects excluding certain keys.
+ *
+ * @param map1 the first map
+ * @param map2 the second map
+ * @param keysToExclude the keys to be excluded during comparison
+ * @return true if the maps are equal excluding the specific keys, false
otherwise
+ */
+ public static boolean areMapsEqualExcludingKeys(
+ Map<String, String> map1, Map<String, String> map2, Set<String>
keysToExclude) {
+ // Return false if either of the maps is null
+ if (map1 == null || map2 == null) {
+ return false;
+ }
+
+ // Return false if the sizes of the maps are different
+ if (map1.size() != map2.size()) {
+ return false;
+ }
+
+ // Create copies of the maps to avoid modifying the original maps
+ Map<String, String> map1Copy = new HashMap<>(map1);
+ Map<String, String> map2Copy = new HashMap<>(map2);
+
+ // Remove specific keys from the copies
+ for (String key : keysToExclude) {
+ map1Copy.remove(key);
+ map2Copy.remove(key);
+ }
+
+ // Return whether the copies are equal
+ return map1Copy.equals(map2Copy);
+ }
+
/** Returns all metrics present. */
public Set<String> metrics() {
return Collections.unmodifiableSet(metrics.keySet());