zentol commented on a change in pull request #17007:
URL: https://github.com/apache/flink/pull/17007#discussion_r697438721



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
##########
@@ -116,6 +117,34 @@ public ReporterSetup(
         }
     }
 
+    public Map<String, String> tryGetAdditionalVariables() {
+        String additionalVariablesMap =
+                configuration.getString(
+                        ConfigConstants.METRICS_REPORTER_ADDITIONAL_VARIABLES, 
null);
+        if (additionalVariablesMap == null) {
+            return Collections.emptyMap();
+        } else {
+            try {
+                final Map<String, String> additionalVariables = new 
HashMap<>();
+                for (String variableEntry : additionalVariablesMap.split(",")) 
{
+                    String[] tagEntry = variableEntry.split(":");

Review comment:
       Instead of this custom parsing, add & use this:
   
   ```
       private static final ConfigOption<Map<String, String>> 
ADDITIONAL_VARIABLES =
               
ConfigOptions.key(ConfigConstants.METRICS_REPORTER_ADDITIONAL_VARIABLES)
                       .mapType()
                       .defaultValue(Collections.emptyMap());
   ```
   Add it to the ReporterSetup for the time being.
   
   We will still need to replace all keys however, as unfortunate as it may be.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
##########
@@ -72,8 +72,15 @@ public MetricGroup getWrappedMetricGroup() {
 
     @Override
     public Map<String, String> getAllVariables() {
-        return parentMetricGroup.getAllVariables(
-                this.settings.getReporterIndex(), 
this.settings.getExcludedVariables());
+        Map<String, String> allVariables =
+                parentMetricGroup.getAllVariables(
+                        this.settings.getReporterIndex(), 
this.settings.getExcludedVariables());
+
+        for (Map.Entry<String, String> entry : 
this.settings.getAdditionalVariables().entrySet()) {
+            allVariables.putIfAbsent(entry.getKey(), entry.getValue());

Review comment:
       This mutates the variables map in the `parentMetricGroup`, which mustn't 
happen this way as it is rather subtle behavior that could easily cause bugs 
down the line. (`MetricGroup.getAllVariables` shouldn't return a mutable in the 
first place but that's another story)
   
   Ideally we just wrap both maps without copying them; like a `CompositeMap`. 
The map of additional variables should come last. We can skip this step 
naturally if it is empty.
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
##########
@@ -116,6 +117,34 @@ public ReporterSetup(
         }
     }
 
+    public Map<String, String> tryGetAdditionalVariables() {

Review comment:
       ```suggestion
       public Map<String, String> getAdditionalVariables() {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to