This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 9cbf656  [FLINK-23539][metrics][influxdb] Filter '\n' in tags
9cbf656 is described below

commit 9cbf65692aa388995cfded8429f76536d12445ec
Author: yulei0824 <yulei0...@163.com>
AuthorDate: Wed Aug 4 15:01:23 2021 +0800

    [FLINK-23539][metrics][influxdb] Filter '\n' in tags
---
 .../flink/metrics/influxdb/MeasurementInfoProvider.java      |  9 ++++++++-
 .../flink/metrics/influxdb/MeasurementInfoProviderTest.java  | 12 ++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java
 
b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java
index d4dab1b..f5968cf 100644
--- 
a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java
+++ 
b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java
@@ -30,6 +30,7 @@ import java.util.regex.Pattern;
 
 class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> {
     @VisibleForTesting static final char SCOPE_SEPARATOR = '_';
+    private static final String POINT_DELIMITER = "\n";
 
     private static final CharacterFilter CHARACTER_FILTER =
             new CharacterFilter() {
@@ -53,7 +54,9 @@ class MeasurementInfoProvider implements 
MetricInfoProvider<MeasurementInfo> {
         Map<String, String> tags = new HashMap<>();
         for (Map.Entry<String, String> variable : 
group.getAllVariables().entrySet()) {
             String name = variable.getKey();
-            tags.put(name.substring(1, name.length() - 1), 
variable.getValue());
+            tags.put(
+                    normalize(name.substring(1, name.length() - 1)),
+                    normalize(variable.getValue()));
         }
         return tags;
     }
@@ -66,4 +69,8 @@ class MeasurementInfoProvider implements 
MetricInfoProvider<MeasurementInfo> {
         return ((FrontMetricGroup<AbstractMetricGroup<?>>) group)
                 .getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
     }
+
+    private static String normalize(String value) {
+        return value.replace(POINT_DELIMITER, "");
+    }
 }
diff --git 
a/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java
 
b/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java
index 0001157..669033e 100644
--- 
a/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java
+++ 
b/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java
@@ -66,4 +66,16 @@ public class MeasurementInfoProviderTest extends TestLogger {
         assertThat(info.getTags(), hasEntry("C", "c"));
         assertEquals(3, info.getTags().size());
     }
+
+    @Test
+    public void testNormalizingTags() {
+        Map<String, String> variables = new HashMap<>();
+        variables.put("<A\n>", "a\n");
+
+        final MetricGroup metricGroup =
+                TestMetricGroup.newBuilder().setVariables(variables).build();
+
+        MeasurementInfo info = provider.getMetricInfo("m1", metricGroup);
+        assertThat(info.getTags(), hasEntry("A", "a"));
+    }
 }

Reply via email to