This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new b8bf7253 [AURON #1854] Introduce FlinkAuronConfiguration (#2055)
b8bf7253 is described below

commit b8bf7253d950e6ce0a514423f6243ddfd19dae5a
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 2 20:31:26 2026 +0800

    [AURON #1854] Introduce FlinkAuronConfiguration (#2055)
    
    <!--
    - Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
    Short summary...'.
    -->
    # Which issue does this PR close?
    
    Closes #1854
    
    # Rationale for this change
    
    Introduce FlinkAuronConfiguration to unify access operations to
    FlinkConfiguration within Auron.
    
    # What changes are included in this PR?
    
    * add SparkAuronConfiguration
    * add FlinkAuronConfigurationTest
    * CommonTestUtils
    
    
    # Are there any user-facing changes?
    * NO
    # How was this patch tested?
    * Test vim UT FlinkAuronConfigurationTest#testGetConfigFromFlinkConfig
---
 .../configuration/FlinkAuronConfiguration.java     | 92 ++++++++++++++++++++++
 .../configuration/FlinkAuronConfigurationTest.java | 50 ++++++++++++
 .../auron/flink/testutils/CommonTestUtils.java     | 68 ++++++++++++++++
 .../src/test/resources/flink-conf.yaml             | 17 ++++
 4 files changed, 227 insertions(+)

diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
new file mode 100644
index 00000000..26a81d3b
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.configuration;
+
+import java.io.File;
+import java.util.List;
+import java.util.Optional;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+/**
+ * Flink configuration proxy for Auron.
+ * All configuration prefixes start with flink.
+ */
+public class FlinkAuronConfiguration extends AuronConfiguration {
+
+    // When using getOptional, the prefix will be automatically completed. If 
you only need to print the Option key,
+    // please manually add the prefix.
+    public static final String FLINK_PREFIX = "flink.";
+    private final Configuration flinkConfig;
+
+    public FlinkAuronConfiguration() {
+        String pwd = System.getenv("PWD");
+        if (new File(pwd + GlobalConfiguration.FLINK_CONF_FILENAME).exists()) {
+            // flink on yarn
+            flinkConfig = GlobalConfiguration.loadConfiguration(pwd);
+        } else {
+            // flink on k8s
+            flinkConfig = GlobalConfiguration.loadConfiguration();
+        }
+    }
+
+    @Override
+    public <T> Optional<T> getOptional(ConfigOption<T> configOption) {
+        return Optional.ofNullable(
+                getFromFlinkConfig(configOption.key(), configOption.altKeys(), 
configOption.getValueClass()));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T getFromFlinkConfig(String key, List<String> altKeys, 
Class<T> valueClass) {
+        String flinkKey = key.startsWith(FLINK_PREFIX) ? key : FLINK_PREFIX + 
key;
+        ConfigOptions.OptionBuilder flinkOptionBuilder = 
ConfigOptions.key(flinkKey);
+        org.apache.flink.configuration.ConfigOption<T> flinkOption;
+        if (valueClass == String.class) {
+            flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+                    flinkOptionBuilder.stringType().noDefaultValue();
+        } else if (valueClass == Integer.class) {
+            flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+                    flinkOptionBuilder.intType().noDefaultValue();
+        } else if (valueClass == Long.class) {
+            flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+                    flinkOptionBuilder.longType().noDefaultValue();
+        } else if (valueClass == Boolean.class) {
+            flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+                    flinkOptionBuilder.booleanType().noDefaultValue();
+        } else if (valueClass == Float.class) {
+            flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+                    flinkOptionBuilder.floatType().noDefaultValue();
+        } else if (valueClass == Double.class) {
+            flinkOption = (org.apache.flink.configuration.ConfigOption<T>)
+                    flinkOptionBuilder.doubleType().noDefaultValue();
+        } else {
+            throw new IllegalArgumentException("Unsupported value class: " + 
valueClass);
+        }
+        if (!altKeys.isEmpty()) {
+            String[] altKeysArray = new String[altKeys.size()];
+            for (int i = 0; i < altKeys.size(); i++) {
+                String altKey = altKeys.get(i);
+                altKeysArray[i] = altKey.startsWith(FLINK_PREFIX) ? altKey : 
FLINK_PREFIX + altKey;
+            }
+            flinkOption = flinkOption.withDeprecatedKeys(altKeysArray);
+        }
+        return flinkConfig.get(flinkOption);
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
new file mode 100644
index 00000000..fe98b927
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.configuration;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.io.File;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.flink.testutils.CommonTestUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.junit.jupiter.api.Test;
+
+/**
+ * This class is used to test the FlinkAuronConfiguration class.
+ */
+public class FlinkAuronConfigurationTest {
+
+    @Test
+    public void testGetConfigFromFlinkConfig() {
+        URL flinkConfigFileUrl =
+                
FlinkAuronConfigurationTest.class.getClassLoader().getResource(GlobalConfiguration.FLINK_CONF_FILENAME);
+        Map<String, String> env = new HashMap<>(System.getenv());
+        env.put(
+                ConfigConstants.ENV_FLINK_CONF_DIR,
+                new 
File(flinkConfigFileUrl.getFile()).getParentFile().getAbsolutePath());
+        CommonTestUtils.setEnv(env);
+        FlinkAuronConfiguration config = new FlinkAuronConfiguration();
+        assertEquals(config.get(AuronConfiguration.BATCH_SIZE), 9999);
+        assertEquals(config.get(AuronConfiguration.NATIVE_LOG_LEVEL), "DEBUG");
+        assertEquals(config.get(AuronConfiguration.MEMORY_FRACTION), 0.6); // 
default value
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/testutils/CommonTestUtils.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/testutils/CommonTestUtils.java
new file mode 100644
index 00000000..abe1c158
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/testutils/CommonTestUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.testutils;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+/**
+ * Common test utilities.
+ */
+public class CommonTestUtils {
+
+    // ------------------------------------------------------------------------
+    //  Manipulation of environment
+    // ------------------------------------------------------------------------
+    public static void setEnv(Map<String, String> newenv) {
+        setEnv(newenv, true);
+    }
+
+    // This code is taken slightly modified from: 
http://stackoverflow.com/a/7201825/568695
+    // it changes the environment variables of this JVM. Use only for testing 
purposes!
+    @SuppressWarnings("unchecked")
+    public static void setEnv(Map<String, String> newenv, boolean 
clearExisting) {
+        try {
+            Map<String, String> env = System.getenv();
+            Class<?> clazz = env.getClass();
+            Field field = clazz.getDeclaredField("m");
+            field.setAccessible(true);
+            Map<String, String> map = (Map<String, String>) field.get(env);
+            if (clearExisting) {
+                map.clear();
+            }
+            map.putAll(newenv);
+
+            // only for Windows
+            Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+            try {
+                Field theCaseInsensitiveEnvironmentField =
+                        
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+                theCaseInsensitiveEnvironmentField.setAccessible(true);
+                Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
+                if (clearExisting) {
+                    cienv.clear();
+                }
+                cienv.putAll(newenv);
+            } catch (NoSuchFieldException ignored) {
+                // ignored
+            }
+
+        } catch (Exception e1) {
+            throw new RuntimeException(e1);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/resources/flink-conf.yaml 
b/auron-flink-extension/auron-flink-runtime/src/test/resources/flink-conf.yaml
new file mode 100644
index 00000000..16c2ea07
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/resources/flink-conf.yaml
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+flink.auron.batchSize: 9999
+flink.auron.native.log.level: DEBUG

Reply via email to