This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 81f54d4b083 [FLINK-35750][runtime/metrics] Fix that the latency marker
metrics aren't updated after failover.
81f54d4b083 is described below
commit 81f54d4b08301df7564deb8a1e87529cfe98762b
Author: Roc Marshal <[email protected]>
AuthorDate: Thu Jul 4 13:08:51 2024 +0800
[FLINK-35750][runtime/metrics] Fix that the latency marker metrics aren't
updated after failover.
(cherry picked from commit 615d19735b0691b57262a110e6078c3488349f5a)
---
.../flink/runtime/metrics/groups/InternalOperatorMetricGroup.java | 4 ++--
.../flink/streaming/api/operators/AbstractStreamOperator.java | 6 +++---
.../flink/streaming/api/operators/AbstractStreamOperatorV2.java | 6 +++---
3 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
index d075675b80c..22bfecf1deb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
@@ -64,8 +64,8 @@ public class InternalOperatorMetricGroup extends
ComponentMetricGroup<TaskMetric
return parent.getIOMetricGroup();
}
- public final MetricGroup getJobMetricGroup() {
- return parent.parent;
+ public final MetricGroup getTaskMetricGroup() {
+ return parent;
}
@Override
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 78fb35af4e0..6fe598ac37c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -193,10 +193,10 @@ public abstract class AbstractStreamOperator<OUT>
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
- MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup();
+ MetricGroup taskMetricGroup = this.metrics.getTaskMetricGroup();
this.latencyStats =
new LatencyStats(
- jobMetricGroup.addGroup("latency"),
+ taskMetricGroup.addGroup("latency"),
historySize,
container.getIndexInSubtaskGroup(),
getOperatorID(),
@@ -205,7 +205,7 @@ public abstract class AbstractStreamOperator<OUT>
LOG.warn("An error occurred while instantiating latency metrics.",
e);
this.latencyStats =
new LatencyStats(
-
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()
+
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()
.addGroup("latency"),
1,
0,
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index a10206ba0e5..884ea7079cb 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -157,9 +157,9 @@ public abstract class AbstractStreamOperatorV2<OUT>
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
- MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup();
+ MetricGroup taskMetricGroup = this.metrics.getTaskMetricGroup();
return new LatencyStats(
- jobMetricGroup.addGroup("latency"),
+ taskMetricGroup.addGroup("latency"),
historySize,
indexInSubtaskGroup,
getOperatorID(),
@@ -167,7 +167,7 @@ public abstract class AbstractStreamOperatorV2<OUT>
} catch (Exception e) {
LOG.warn("An error occurred while instantiating latency metrics.",
e);
return new LatencyStats(
-
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()
+
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()
.addGroup("latency"),
1,
0,