This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 9cbf656 [FLINK-23539][metrics][influxdb] Filter '\n' in tags 9cbf656 is described below commit 9cbf65692aa388995cfded8429f76536d12445ec Author: yulei0824 <yulei0...@163.com> AuthorDate: Wed Aug 4 15:01:23 2021 +0800 [FLINK-23539][metrics][influxdb] Filter '\n' in tags --- .../flink/metrics/influxdb/MeasurementInfoProvider.java | 9 ++++++++- .../flink/metrics/influxdb/MeasurementInfoProviderTest.java | 12 ++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java index d4dab1b..f5968cf 100644 --- a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java +++ b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MeasurementInfoProvider.java @@ -30,6 +30,7 @@ import java.util.regex.Pattern; class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> { @VisibleForTesting static final char SCOPE_SEPARATOR = '_'; + private static final String POINT_DELIMITER = "\n"; private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() { @@ -53,7 +54,9 @@ class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> { Map<String, String> tags = new HashMap<>(); for (Map.Entry<String, String> variable : group.getAllVariables().entrySet()) { String name = variable.getKey(); - tags.put(name.substring(1, name.length() - 1), variable.getValue()); + tags.put( + normalize(name.substring(1, name.length() - 1)), + normalize(variable.getValue())); } return tags; } @@ -66,4 +69,8 @@ class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> { return ((FrontMetricGroup<AbstractMetricGroup<?>>) group) .getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR); } + + private static String normalize(String value) { + return value.replace(POINT_DELIMITER, ""); + } } diff --git a/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java b/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java index 0001157..669033e 100644 --- a/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java +++ b/flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java @@ -66,4 +66,16 @@ public class MeasurementInfoProviderTest extends TestLogger { assertThat(info.getTags(), hasEntry("C", "c")); assertEquals(3, info.getTags().size()); } + + @Test + public void testNormalizingTags() { + Map<String, String> variables = new HashMap<>(); + variables.put("<A\n>", "a\n"); + + final MetricGroup metricGroup = + TestMetricGroup.newBuilder().setVariables(variables).build(); + + MeasurementInfo info = provider.getMetricInfo("m1", metricGroup); + assertThat(info.getTags(), hasEntry("A", "a")); + } }