This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new b8bf7253 [AURON #1854] Introduce FlinkAuronConfiguration (#2055)
b8bf7253 is described below
commit b8bf7253d950e6ce0a514423f6243ddfd19dae5a
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 2 20:31:26 2026 +0800
[AURON #1854] Introduce FlinkAuronConfiguration (#2055)
<!--
- Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
Short summary...'.
-->
# Which issue does this PR close?
Closes #1854
# Rationale for this change
Introduce FlinkAuronConfiguration to unify access operations to
FlinkConfiguration within Auron.
# What changes are included in this PR?
* add SparkAuronConfiguration
* add FlinkAuronConfigurationTest
* CommonTestUtils
# Are there any user-facing changes?
* NO
# How was this patch tested?
* Test vim UT FlinkAuronConfigurationTest#testGetConfigFromFlinkConfig
---
.../configuration/FlinkAuronConfiguration.java | 92 ++++++++++++++++++++++
.../configuration/FlinkAuronConfigurationTest.java | 50 ++++++++++++
.../auron/flink/testutils/CommonTestUtils.java | 68 ++++++++++++++++
.../src/test/resources/flink-conf.yaml | 17 ++++
4 files changed, 227 insertions(+)
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
new file mode 100644
index 00000000..26a81d3b
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.configuration;
+
+import java.io.File;
+import java.util.List;
+import java.util.Optional;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+/**
+ * Flink configuration proxy for Auron.
+ * All configuration prefixes start with flink.
+ */
+public class FlinkAuronConfiguration extends AuronConfiguration {
+
+ // When using getOptional, the prefix will be automatically completed. If
you only need to print the Option key,
+ // please manually add the prefix.
+ public static final String FLINK_PREFIX = "flink.";
+ private final Configuration flinkConfig;
+
+ public FlinkAuronConfiguration() {
+ String pwd = System.getenv("PWD");
+ if (new File(pwd + GlobalConfiguration.FLINK_CONF_FILENAME).exists()) {
+ // flink on yarn
+ flinkConfig = GlobalConfiguration.loadConfiguration(pwd);
+ } else {
+ // flink on k8s
+ flinkConfig = GlobalConfiguration.loadConfiguration();
+ }
+ }
+
+ @Override
+ public <T> Optional<T> getOptional(ConfigOption<T> configOption) {
+ return Optional.ofNullable(
+ getFromFlinkConfig(configOption.key(), configOption.altKeys(),
configOption.getValueClass()));
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T getFromFlinkConfig(String key, List<String> altKeys,
Class<T> valueClass) {
+ String flinkKey = key.startsWith(FLINK_PREFIX) ? key : FLINK_PREFIX +
key;
+ ConfigOptions.OptionBuilder flinkOptionBuilder =
ConfigOptions.key(flinkKey);
+ org.apache.flink.configuration.ConfigOption<T> flinkOption;
+ if (valueClass == String.class) {
+ flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+ flinkOptionBuilder.stringType().noDefaultValue();
+ } else if (valueClass == Integer.class) {
+ flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+ flinkOptionBuilder.intType().noDefaultValue();
+ } else if (valueClass == Long.class) {
+ flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+ flinkOptionBuilder.longType().noDefaultValue();
+ } else if (valueClass == Boolean.class) {
+ flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+ flinkOptionBuilder.booleanType().noDefaultValue();
+ } else if (valueClass == Float.class) {
+ flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+ flinkOptionBuilder.floatType().noDefaultValue();
+ } else if (valueClass == Double.class) {
+ flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+ flinkOptionBuilder.doubleType().noDefaultValue();
+ } else {
+ throw new IllegalArgumentException("Unsupported value class: " +
valueClass);
+ }
+ if (!altKeys.isEmpty()) {
+ String[] altKeysArray = new String[altKeys.size()];
+ for (int i = 0; i < altKeys.size(); i++) {
+ String altKey = altKeys.get(i);
+ altKeysArray[i] = altKey.startsWith(FLINK_PREFIX) ? altKey :
FLINK_PREFIX + altKey;
+ }
+ flinkOption = flinkOption.withDeprecatedKeys(altKeysArray);
+ }
+ return flinkConfig.get(flinkOption);
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
new file mode 100644
index 00000000..fe98b927
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.configuration;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.io.File;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.flink.testutils.CommonTestUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.junit.jupiter.api.Test;
+
+/**
+ * This class is used to test the FlinkAuronConfiguration class.
+ */
+public class FlinkAuronConfigurationTest {
+
+ @Test
+ public void testGetConfigFromFlinkConfig() {
+ URL flinkConfigFileUrl =
+
FlinkAuronConfigurationTest.class.getClassLoader().getResource(GlobalConfiguration.FLINK_CONF_FILENAME);
+ Map<String, String> env = new HashMap<>(System.getenv());
+ env.put(
+ ConfigConstants.ENV_FLINK_CONF_DIR,
+ new
File(flinkConfigFileUrl.getFile()).getParentFile().getAbsolutePath());
+ CommonTestUtils.setEnv(env);
+ FlinkAuronConfiguration config = new FlinkAuronConfiguration();
+ assertEquals(config.get(AuronConfiguration.BATCH_SIZE), 9999);
+ assertEquals(config.get(AuronConfiguration.NATIVE_LOG_LEVEL), "DEBUG");
+ assertEquals(config.get(AuronConfiguration.MEMORY_FRACTION), 0.6); //
default value
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/testutils/CommonTestUtils.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/testutils/CommonTestUtils.java
new file mode 100644
index 00000000..abe1c158
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/testutils/CommonTestUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.testutils;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+/**
+ * Common test utilities.
+ */
+public class CommonTestUtils {
+
+ // ------------------------------------------------------------------------
+ // Manipulation of environment
+ // ------------------------------------------------------------------------
+ public static void setEnv(Map<String, String> newenv) {
+ setEnv(newenv, true);
+ }
+
+ // This code is taken slightly modified from:
http://stackoverflow.com/a/7201825/568695
+ // it changes the environment variables of this JVM. Use only for testing
purposes!
+ @SuppressWarnings("unchecked")
+ public static void setEnv(Map<String, String> newenv, boolean
clearExisting) {
+ try {
+ Map<String, String> env = System.getenv();
+ Class<?> clazz = env.getClass();
+ Field field = clazz.getDeclaredField("m");
+ field.setAccessible(true);
+ Map<String, String> map = (Map<String, String>) field.get(env);
+ if (clearExisting) {
+ map.clear();
+ }
+ map.putAll(newenv);
+
+ // only for Windows
+ Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
+ try {
+ Field theCaseInsensitiveEnvironmentField =
+
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+ theCaseInsensitiveEnvironmentField.setAccessible(true);
+ Map<String, String> cienv = (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
+ if (clearExisting) {
+ cienv.clear();
+ }
+ cienv.putAll(newenv);
+ } catch (NoSuchFieldException ignored) {
+ // ignored
+ }
+
+ } catch (Exception e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/resources/flink-conf.yaml
b/auron-flink-extension/auron-flink-runtime/src/test/resources/flink-conf.yaml
new file mode 100644
index 00000000..16c2ea07
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/resources/flink-conf.yaml
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+flink.auron.batchSize: 9999
+flink.auron.native.log.level: DEBUG