zentol commented on a change in pull request #9870: [FLINK-14350][metrics]
Introduce dedicated MetricScope
URL: https://github.com/apache/flink/pull/9870#discussion_r335889976
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
##########
@@ -30,28 +34,54 @@
*/
public class FrontMetricGroup<P extends AbstractMetricGroup<?>> extends
ProxyMetricGroup<P> {
- protected int reporterIndex;
+ private final ReporterIndexInjectingMetricScope scope;
public FrontMetricGroup(int reporterIndex, P reference) {
super(reference);
- this.reporterIndex = reporterIndex;
+ this.scope = new
ReporterIndexInjectingMetricScope(reporterIndex,
this.parentMetricGroup.getScope());
}
@Override
public String getMetricIdentifier(String metricName) {
- return parentMetricGroup.getMetricIdentifier(metricName, null,
this.reporterIndex);
+ return scope.getMetricIdentifier(metricName);
}
@Override
public String getMetricIdentifier(String metricName, CharacterFilter
filter) {
- return parentMetricGroup.getMetricIdentifier(metricName,
filter, this.reporterIndex);
+ return scope.getMetricIdentifier(metricName, filter);
}
public String getLogicalScope(CharacterFilter filter) {
return parentMetricGroup.getLogicalScope(filter);
}
public String getLogicalScope(CharacterFilter filter, char delimiter) {
- return parentMetricGroup.getLogicalScope(filter, delimiter,
this.reporterIndex);
+ return parentMetricGroup.getLogicalScope(filter, delimiter,
scope.reporterIndex);
+ }
+
+ private static final class ReporterIndexInjectingMetricScope implements
MetricScope {
+
+ private final int reporterIndex;
+ private final InternalMetricScope scope;
+
+ private ReporterIndexInjectingMetricScope(int reporterIndex,
InternalMetricScope scope) {
+ this.reporterIndex = reporterIndex;
+ this.scope = scope;
+ }
+
+ @Override
+ public Map<String, String> getAllVariables() {
+ return scope.getAllVariables();
+ }
+
+ @Override
+ public String getMetricIdentifier(String metricName) {
+ return scope.getMetricIdentifier(metricName, s -> s,
reporterIndex);
+ }
+
+ @Override
+ public String getMetricIdentifier(String metricName,
CharacterFilter filter) {
+ return scope.getMetricIdentifier(metricName, filter,
reporterIndex);
+ }
Review comment:
FLINK-14410 aims to address this problem for the `MetricScope` at least. We
can't get rid of it for the time being since we have to ensure
backwards-compatibility (i.e., retain existing methods in the MetricGroup
interface that accept a CharacterFIlter).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services