This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9964ab4bd1b [FLINK-39130][metrics] Allow native types in MetricConfig
9964ab4bd1b is described below
commit 9964ab4bd1b8334dec9388e1e4dac68c94488691
Author: Aleksandr Iushmanov <[email protected]>
AuthorDate: Fri Feb 20 16:16:19 2026 +0000
[FLINK-39130][metrics] Allow native types in MetricConfig
---
.../org/apache/flink/metrics/MetricConfig.java | 105 +++++++++++++------
.../org/apache/flink/metrics/MetricConfigTest.java | 114 +++++++++++++++++++++
2 files changed, 188 insertions(+), 31 deletions(-)
diff --git
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
index 2ff0426ad5a..b9fcafb5f2d 100644
---
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
+++
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
@@ -22,7 +22,14 @@ import org.apache.flink.annotation.Public;
import java.util.Properties;
-/** A properties class with added utility method to extract primitives. */
+/**
+ * A properties class with added utility methods to extract primitives.
+ *
+ * <p>Values may be stored as strings via {@link #setProperty(String, String)}
or as native Java
+ * types via {@link #put(Object, Object)} (e.g., when Flink's YAML
configuration parser stores
+ * Integer, Long, or Boolean values directly). The getter methods handle both
representations
+ * transparently.
+ */
@Public
public class MetricConfig extends Properties {
@@ -31,72 +38,108 @@ public class MetricConfig extends Properties {
}
/**
- * Searches for the property with the specified key in this property list.
If the key is not
- * found in this property list, the default property list, and its
defaults, recursively, are
- * then checked. The method returns the default value argument if the
property is not found.
+ * Returns the value associated with the given key as an {@code int}.
+ *
+ * <p>If the value is a {@link Number}, its {@code intValue()} is returned
directly. Otherwise,
+ * the value's string representation is parsed via {@link
Integer#parseInt(String)}.
*
* @param key the hashtable key.
* @param defaultValue a default value.
- * @return the value in this property list with the specified key value
parsed as an int.
+ * @return the value in this property list with the specified key value as
an int.
*/
public int getInteger(String key, int defaultValue) {
- String argument = getProperty(key, null);
- return argument == null ? defaultValue : Integer.parseInt(argument);
+ final Object value = get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ return Integer.parseInt(value.toString());
}
/**
- * Searches for the property with the specified key in this property list.
If the key is not
- * found in this property list, the default property list, and its
defaults, recursively, are
- * then checked. The method returns the default value argument if the
property is not found.
+ * Returns the value associated with the given key as a {@code long}.
+ *
+ * <p>If the value is a {@link Number}, its {@code longValue()} is
returned directly. Otherwise,
+ * the value's string representation is parsed via {@link
Long#parseLong(String)}.
*
* @param key the hashtable key.
* @param defaultValue a default value.
- * @return the value in this property list with the specified key value
parsed as a long.
+ * @return the value in this property list with the specified key value as
a long.
*/
public long getLong(String key, long defaultValue) {
- String argument = getProperty(key, null);
- return argument == null ? defaultValue : Long.parseLong(argument);
+ final Object value = get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+ return Long.parseLong(value.toString());
}
/**
- * Searches for the property with the specified key in this property list.
If the key is not
- * found in this property list, the default property list, and its
defaults, recursively, are
- * then checked. The method returns the default value argument if the
property is not found.
+ * Returns the value associated with the given key as a {@code float}.
+ *
+ * <p>If the value is a {@link Number}, its {@code floatValue()} is
returned directly.
+ * Otherwise, the value's string representation is parsed via {@link
Float#parseFloat(String)}.
*
* @param key the hashtable key.
* @param defaultValue a default value.
- * @return the value in this property list with the specified key value
parsed as a float.
+ * @return the value in this property list with the specified key value as
a float.
*/
public float getFloat(String key, float defaultValue) {
- String argument = getProperty(key, null);
- return argument == null ? defaultValue : Float.parseFloat(argument);
+ final Object value = get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).floatValue();
+ }
+ return Float.parseFloat(value.toString());
}
/**
- * Searches for the property with the specified key in this property list.
If the key is not
- * found in this property list, the default property list, and its
defaults, recursively, are
- * then checked. The method returns the default value argument if the
property is not found.
+ * Returns the value associated with the given key as a {@code double}.
+ *
+ * <p>If the value is a {@link Number}, its {@code doubleValue()} is
returned directly.
+ * Otherwise, the value's string representation is parsed via {@link
+ * Double#parseDouble(String)}.
*
* @param key the hashtable key.
* @param defaultValue a default value.
- * @return the value in this property list with the specified key value
parsed as a double.
+ * @return the value in this property list with the specified key value as
a double.
*/
public double getDouble(String key, double defaultValue) {
- String argument = getProperty(key, null);
- return argument == null ? defaultValue : Double.parseDouble(argument);
+ final Object value = get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+ return Double.parseDouble(value.toString());
}
/**
- * Searches for the property with the specified key in this property list.
If the key is not
- * found in this property list, the default property list, and its
defaults, recursively, are
- * then checked. The method returns the default value argument if the
property is not found.
+ * Returns the value associated with the given key as a {@code boolean}.
+ *
+ * <p>If the value is a {@link Boolean}, it is returned directly.
Otherwise, the value's string
+ * representation is parsed via {@link Boolean#parseBoolean(String)}.
*
* @param key the hashtable key.
* @param defaultValue a default value.
- * @return the value in this property list with the specified key value
parsed as a boolean.
+ * @return the value in this property list with the specified key value as
a boolean.
*/
public boolean getBoolean(String key, boolean defaultValue) {
- String argument = getProperty(key, null);
- return argument == null ? defaultValue :
Boolean.parseBoolean(argument);
+ final Object value = get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Boolean) {
+ return (Boolean) value;
+ }
+ return Boolean.parseBoolean(value.toString());
}
}
diff --git
a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MetricConfigTest.java
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MetricConfigTest.java
new file mode 100644
index 00000000000..d3be33d9663
--- /dev/null
+++
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MetricConfigTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MetricConfig}. */
+class MetricConfigTest {
+
+ @ParameterizedTest
+ @MethodSource("fromStringCases")
+ void testGetFromString(
+ final String storedValue, final TypedGetter getter, final Object
expected) {
+ final MetricConfig config = new MetricConfig();
+ config.setProperty("key", storedValue);
+ assertThat(getter.get(config, "key")).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @MethodSource("nativeTypeCases")
+ void testGetFromNativeType(
+ final Object storedValue, final TypedGetter getter, final Object
expected) {
+ final MetricConfig config = new MetricConfig();
+ config.put("key", storedValue);
+ assertThat(getter.get(config, "key")).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @MethodSource("crossTypeCases")
+ void testGetCrossType(
+ final Object storedValue, final TypedGetter getter, final Object
expected) {
+ final MetricConfig config = new MetricConfig();
+ config.put("key", storedValue);
+ assertThat(getter.get(config, "key")).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @MethodSource("defaultValueCases")
+ void testGetDefault(final TypedGetter getter, final Object expected) {
+ final MetricConfig config = new MetricConfig();
+ assertThat(getter.get(config, "missing")).isEqualTo(expected);
+ }
+
+ @FunctionalInterface
+ private interface TypedGetter {
+ Object get(MetricConfig config, String key);
+ }
+
+ private static Stream<Arguments> fromStringCases() {
+ return Stream.of(
+ Arguments.of("42", (TypedGetter) (c, k) -> c.getInteger(k, 0),
42),
+ Arguments.of(
+ "123456789012345",
+ (TypedGetter) (c, k) -> c.getLong(k, 0L),
+ 123456789012345L),
+ Arguments.of("3.14", (TypedGetter) (c, k) -> c.getFloat(k,
0.0f), 3.14f),
+ Arguments.of(
+ "2.718281828", (TypedGetter) (c, k) -> c.getDouble(k,
0.0), 2.718281828),
+ Arguments.of("true", (TypedGetter) (c, k) -> c.getBoolean(k,
false), true));
+ }
+
+ private static Stream<Arguments> nativeTypeCases() {
+ return Stream.of(
+ Arguments.of(42, (TypedGetter) (c, k) -> c.getInteger(k, 0),
42),
+ Arguments.of(
+ 123456789012345L,
+ (TypedGetter) (c, k) -> c.getLong(k, 0L),
+ 123456789012345L),
+ Arguments.of(3.14f, (TypedGetter) (c, k) -> c.getFloat(k,
0.0f), 3.14f),
+ Arguments.of(2.718281828, (TypedGetter) (c, k) ->
c.getDouble(k, 0.0), 2.718281828),
+ Arguments.of(true, (TypedGetter) (c, k) -> c.getBoolean(k,
false), true));
+ }
+
+ private static Stream<Arguments> crossTypeCases() {
+ return Stream.of(
+ Arguments.of(42, (TypedGetter) (c, k) -> c.getLong(k, 0L),
42L),
+ Arguments.of(100L, (TypedGetter) (c, k) -> c.getInteger(k, 0),
100),
+ Arguments.of(42, (TypedGetter) (c, k) -> c.getDouble(k, 0.0),
42.0),
+ Arguments.of(42, (TypedGetter) (c, k) -> c.getFloat(k, 0.0f),
42.0f));
+ }
+
+ private static Stream<Arguments> defaultValueCases() {
+ return Stream.of(
+ Arguments.of((TypedGetter) (c, k) -> c.getInteger(k, 99), 99),
+ Arguments.of((TypedGetter) (c, k) -> c.getLong(k, 99L), 99L),
+ Arguments.of((TypedGetter) (c, k) -> c.getFloat(k, 1.5f),
1.5f),
+ Arguments.of((TypedGetter) (c, k) -> c.getDouble(k, 1.5), 1.5),
+ Arguments.of((TypedGetter) (c, k) -> c.getBoolean(k, true),
true),
+ Arguments.of((TypedGetter) (c, k) -> c.getBoolean(k, false),
false),
+ Arguments.of((TypedGetter) (c, k) -> c.getString(k,
"default"), "default"));
+ }
+}