1u0 commented on a change in pull request #9870: [FLINK-14350][metrics] 
Introduce dedicated MetricScope
URL: https://github.com/apache/flink/pull/9870#discussion_r335759928
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 ##########
 @@ -30,28 +34,54 @@
  */
 public class FrontMetricGroup<P extends AbstractMetricGroup<?>> extends 
ProxyMetricGroup<P> {
 
-       protected int reporterIndex;
+       private final ReporterIndexInjectingMetricScope scope;
 
        public FrontMetricGroup(int reporterIndex, P reference) {
                super(reference);
-               this.reporterIndex = reporterIndex;
+               this.scope = new 
ReporterIndexInjectingMetricScope(reporterIndex, 
this.parentMetricGroup.getScope());
        }
 
        @Override
        public String getMetricIdentifier(String metricName) {
-               return parentMetricGroup.getMetricIdentifier(metricName, null, 
this.reporterIndex);
+               return scope.getMetricIdentifier(metricName);
        }
 
        @Override
        public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
-               return parentMetricGroup.getMetricIdentifier(metricName, 
filter, this.reporterIndex);
+               return scope.getMetricIdentifier(metricName, filter);
        }
 
        public String getLogicalScope(CharacterFilter filter) {
                return parentMetricGroup.getLogicalScope(filter);
        }
 
        public String getLogicalScope(CharacterFilter filter, char delimiter) {
-               return parentMetricGroup.getLogicalScope(filter, delimiter, 
this.reporterIndex);
+               return parentMetricGroup.getLogicalScope(filter, delimiter, 
scope.reporterIndex);
+       }
+
+       private static final class ReporterIndexInjectingMetricScope implements 
MetricScope {
+
+               private final int reporterIndex;
+               private final InternalMetricScope scope;
+
+               private ReporterIndexInjectingMetricScope(int reporterIndex, 
InternalMetricScope scope) {
+                       this.reporterIndex = reporterIndex;
+                       this.scope = scope;
+               }
+
+               @Override
+               public Map<String, String> getAllVariables() {
+                       return scope.getAllVariables();
+               }
+
+               @Override
+               public String getMetricIdentifier(String metricName) {
+                       return scope.getMetricIdentifier(metricName, s -> s, 
reporterIndex);
+               }
+
+               @Override
+               public String getMetricIdentifier(String metricName, 
CharacterFilter filter) {
+                       return scope.getMetricIdentifier(metricName, filter, 
reporterIndex);
+               }
 
 Review comment:
   **Side note:** this is potentially error prone, it's possible that `filter` 
is non trivial `CharacterFilter` (a case when results differ for different 
filters) and we are calling both methods for the same metric reporter.
   In such situation the cached value would be the one that was called first.
   
   Although, this error proneness existed in the old code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to