This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-spark-auron-conf
in repository https://gitbox.apache.org/repos/asf/auron.git

commit 83ccab8a36b1206052fd046c2dc8356b65f0b977
Author: zhangli20 <[email protected]>
AuthorDate: Tue Jan 27 17:43:25 2026 +0800

    Refactor SparkAuronConfiguration and remove deprecated AuronConf classes
---
 .../auron/configuration/AuronConfiguration.java    |  92 +--
 .../apache/auron/configuration/ConfigOption.java   | 109 ++-
 .../apache/auron/configuration/ConfigOptions.java  | 174 -----
 .../main/java/org/apache/auron/jni/JniBridge.java  |  34 +
 .../configuration/AuronConfigurationTest.java      |  54 --
 .../auron/configuration/ConfigOptionTest.java      |  51 --
 .../configuration/MockAuronConfiguration.java      |  64 +-
 native-engine/auron-jni-bridge/src/conf.rs         |  10 +-
 native-engine/auron-jni-bridge/src/jni_bridge.rs   |  95 +--
 .../ForceApplyShuffledHashJoinInterceptor.java     |   3 +-
 .../org/apache/spark/sql/auron/ShimsImpl.scala     |  14 +-
 .../scala/org/apache/auron/AuronQuerySuite.scala   |   6 +-
 .../org/apache/auron/NativeConvertersSuite.scala   |  10 +-
 .../org/apache/auron/jni/SparkAuronAdaptor.java    |   3 +-
 .../configuration/SparkAuronConfiguration.java     | 826 ++++++++++++++-------
 .../SparkAuronConfigurationDocGenerator.java       |  72 ++
 .../java/org/apache/spark/sql/auron/AuronConf.java | 197 -----
 .../java/org/apache/spark/sql/auron/JniBridge.java | 146 ----
 .../spark/sql/auron/AuronCallNativeWrapper.scala   | 238 ------
 .../apache/spark/sql/auron/AuronConverters.scala   |  76 +-
 .../sql/auron/AuronSparkSessionExtension.scala     |  11 +-
 .../apache/spark/sql/auron/NativeConverters.scala  |  34 +-
 .../configuration/SparkAuronConfigurationTest.java |  77 --
 .../hive/auron/paimon/PaimonConvertProvider.scala  |   6 +-
 24 files changed, 895 insertions(+), 1507 deletions(-)

diff --git 
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
 
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
index d6acc489..31017d8b 100644
--- 
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
+++ 
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
@@ -23,26 +23,24 @@ import java.util.Optional;
  */
 public abstract class AuronConfiguration {
 
-    public static final ConfigOption<Integer> BATCH_SIZE = 
ConfigOptions.key("auron.batchSize")
-            .description("Suggested batch size for arrow batches.")
-            .intType()
-            .defaultValue(10000);
-
-    public static final ConfigOption<Double> MEMORY_FRACTION = 
ConfigOptions.key("auron.memoryFraction")
-            .description("Suggested fraction of off-heap memory used in native 
execution. "
+    public static final ConfigOption<Integer> BATCH_SIZE = new 
ConfigOption<>(Integer.class)
+            .withKey("auron.batchSize")
+            .withDescription("Suggested batch size for arrow batches.")
+            .withDefaultValue(10000);
+
+    public static final ConfigOption<Double> MEMORY_FRACTION = new 
ConfigOption<>(Double.class)
+            .withKey("auron.memoryFraction")
+            .withDescription("Suggested fraction of off-heap memory used in 
native execution. "
                     + "actual off-heap memory usage is expected to be 
spark.executor.memoryOverhead * fraction.")
-            .doubleType()
-            .defaultValue(0.6);
+            .withDefaultValue(0.6);
 
-    public static final ConfigOption<String> NATIVE_LOG_LEVEL = 
ConfigOptions.key("auron.native.log.level")
-            .description("Log level for native execution.")
-            .stringType()
-            .defaultValue("info");
+    public static final ConfigOption<String> NATIVE_LOG_LEVEL = new 
ConfigOption<>(String.class)
+            .withKey("auron.native.log.level")
+            .withDescription("Log level for native execution.")
+            .withDefaultValue("info");
 
     public abstract <T> Optional<T> getOptional(ConfigOption<T> option);
 
-    public abstract <T> Optional<T> getOptional(String key);
-
     public <T> T get(ConfigOption<T> option) {
         return getOptional(option).orElseGet(() -> 
getOptionDefaultValue(option));
     }
@@ -57,18 +55,6 @@ public abstract class AuronConfiguration {
         return getOptional(configOption).orElseGet(() -> 
getOptionDefaultValue(configOption));
     }
 
-    /**
-     * Returns the value associated with the given config option as a string. 
If no value is mapped
-     * under any key of the option, it returns the specified default instead 
of the option's default
-     * value.
-     *
-     * @param configOption The configuration option
-     * @return the (default) value associated with the given config option
-     */
-    public String getString(ConfigOption<String> configOption, String 
overrideDefault) {
-        return getOptional(configOption).orElse(overrideDefault);
-    }
-
     /**
      * Returns the value associated with the given config option as an integer.
      *
@@ -79,19 +65,6 @@ public abstract class AuronConfiguration {
         return getOptional(configOption).orElseGet(() -> 
getOptionDefaultValue(configOption));
     }
 
-    /**
-     * Returns the value associated with the given config option as an 
integer. If no value is
-     * mapped under any key of the option, it returns the specified default 
instead of the option's
-     * default value.
-     *
-     * @param configOption The configuration option
-     * @param overrideDefault The value to return if no value was mapped for 
any key of the option
-     * @return the configured value associated with the given config option, 
or the overrideDefault
-     */
-    public int getInteger(ConfigOption<Integer> configOption, int 
overrideDefault) {
-        return getOptional(configOption).orElse(overrideDefault);
-    }
-
     /**
      * Returns the value associated with the given config option as a long 
integer.
      *
@@ -102,19 +75,6 @@ public abstract class AuronConfiguration {
         return getOptional(configOption).orElseGet(() -> 
getOptionDefaultValue(configOption));
     }
 
-    /**
-     * Returns the value associated with the given config option as a long 
integer. If no value is
-     * mapped under any key of the option, it returns the specified default 
instead of the option's
-     * default value.
-     *
-     * @param configOption The configuration option
-     * @param overrideDefault The value to return if no value was mapped for 
any key of the option
-     * @return the configured value associated with the given config option, 
or the overrideDefault
-     */
-    public long getLong(ConfigOption<Long> configOption, long overrideDefault) 
{
-        return getOptional(configOption).orElse(overrideDefault);
-    }
-
     /**
      * Returns the value associated with the given config option as a boolean.
      *
@@ -148,19 +108,6 @@ public abstract class AuronConfiguration {
         return getOptional(configOption).orElseGet(() -> 
getOptionDefaultValue(configOption));
     }
 
-    /**
-     * Returns the value associated with the given config option as a float. 
If no value is mapped
-     * under any key of the option, it returns the specified default instead 
of the option's default
-     * value.
-     *
-     * @param configOption The configuration option
-     * @param overrideDefault The value to return if no value was mapped for 
any key of the option
-     * @return the configured value associated with the given config option, 
or the overrideDefault
-     */
-    public float getFloat(ConfigOption<Float> configOption, float 
overrideDefault) {
-        return getOptional(configOption).orElse(overrideDefault);
-    }
-
     /**
      * Returns the value associated with the given config option as a {@code 
double}.
      *
@@ -171,19 +118,6 @@ public abstract class AuronConfiguration {
         return getOptional(configOption).orElseGet(() -> 
getOptionDefaultValue(configOption));
     }
 
-    /**
-     * Returns the value associated with the given config option as a {@code 
double}. If no value is
-     * mapped under any key of the option, it returns the specified default 
instead of the option's
-     * default value.
-     *
-     * @param configOption The configuration option
-     * @param overrideDefault The value to return if no value was mapped for 
any key of the option
-     * @return the configured value associated with the given config option, 
or the overrideDefault
-     */
-    public double getDouble(ConfigOption<Double> configOption, double 
overrideDefault) {
-        return getOptional(configOption).orElse(overrideDefault);
-    }
-
     /**
      * Returns the value associated with the given config option as a {@code 
double}.
      *
diff --git 
a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java 
b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java
index 925f3ddd..04a4e0f2 100644
--- a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java
+++ b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java
@@ -16,9 +16,10 @@
  */
 package org.apache.auron.configuration;
 
-import static org.apache.auron.util.Preconditions.checkNotNull;
-
+import java.util.ArrayList;
+import java.util.List;
 import java.util.function.Function;
+import org.apache.auron.jni.AuronAdaptor;
 
 /**
  * A {@code ConfigOption} describes a configuration parameter. It encapsulates 
the configuration
@@ -35,16 +36,22 @@ public class ConfigOption<T> {
     public static final String EMPTY_DESCRIPTION = "";
 
     /** The current key for that config option. */
-    private final String key;
+    private String key;
+
+    /** The current key for that config option. */
+    private List<String> altKeys = new ArrayList<>();
 
     /** The default value for this option. */
-    private final T defaultValue;
+    private T defaultValue;
+
+    /** The current category for that config option. */
+    private String category = "Uncategorized";
 
     /** The description for this option. */
-    private final String description;
+    private String description;
 
     /** The function to compute the default value. */
-    private final Function<AuronConfiguration, T> dynamicDefaultValueFunction;
+    private Function<AuronConfiguration, T> dynamicDefaultValueFunction;
 
     /**
      * Type of the value that this ConfigOption describes.
@@ -55,27 +62,40 @@ public class ConfigOption<T> {
      *   <li>typeClass == atomic class and isList == true for {@code 
ConfigOption<List<Integer>>}
      * </ul>
      */
-    private final Class<?> clazz;
+    private Class<T> clazz;
 
-    /**
-     * Creates a new config option with fallback keys.
-     *
-     * @param key The current key for that config option
-     * @param clazz describes type of the ConfigOption, see description of the 
clazz field
-     * @param description Description for that option
-     * @param defaultValue The default value for this option
-     */
-    ConfigOption(
-            String key,
-            Class<?> clazz,
-            T defaultValue,
-            String description,
-            Function<AuronConfiguration, T> dynamicDefaultValueFunction) {
-        this.key = checkNotNull(key);
-        this.description = description;
+    public ConfigOption(Class<T> clazz) {
+        this.clazz = clazz;
+    }
+
+    public ConfigOption<T> withKey(String key) {
+        this.key = key;
+        return this;
+    }
+
+    public ConfigOption<T> addAltKey(String altKey) {
+        this.altKeys.add(altKey);
+        return this;
+    }
+
+    public ConfigOption<T> withDefaultValue(T defaultValue) {
         this.defaultValue = defaultValue;
-        this.clazz = checkNotNull(clazz);
+        return this;
+    }
+
+    public ConfigOption<T> withCategory(String category) {
+        this.category = category;
+        return this;
+    }
+
+    public ConfigOption<T> withDescription(String description) {
+        this.description = description;
+        return this;
+    }
+
+    public ConfigOption<T> 
withDynamicDefaultValue(Function<AuronConfiguration, T> 
dynamicDefaultValueFunction) {
         this.dynamicDefaultValueFunction = dynamicDefaultValueFunction;
+        return this;
     }
 
     /**
@@ -88,21 +108,24 @@ public class ConfigOption<T> {
     }
 
     /**
-     * Gets the description of configuration key
-     *
-     * @return
+     * Gets the alternative configuration keys.
      */
-    public String description() {
-        return description;
+    public List<String> altKeys() {
+        return altKeys;
     }
 
     /**
-     * Checks if this option has a default value.
-     *
-     * @return True if it has a default value, false if not.
+     * Gets the category of configuration key
      */
-    public boolean hasDefaultValue() {
-        return defaultValue != null;
+    public String category() {
+        return category;
+    }
+
+    /**
+     * Gets the description of configuration key
+     */
+    public String description() {
+        return description;
     }
 
     /**
@@ -131,4 +154,22 @@ public class ConfigOption<T> {
     public Function<AuronConfiguration, T> dynamicDefaultValueFunction() {
         return dynamicDefaultValueFunction;
     }
+
+    /**
+     * Gets the type class of the value that this ConfigOption describes.
+     *
+     * @return The type class of the value that this ConfigOption describes.
+     */
+    public Class<T> getValueClass() {
+        return clazz;
+    }
+
+    /**
+     * Retrieves the current value of this configuration option.
+     *
+     * @return the current value associated with this configuration option.
+     */
+    public T get() {
+        return AuronAdaptor.getInstance().getAuronConfiguration().get(this);
+    }
 }
diff --git 
a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java 
b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java
deleted file mode 100644
index 47efc98e..00000000
--- a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.configuration;
-
-import static org.apache.auron.util.Preconditions.checkNotNull;
-
-import java.util.function.Function;
-
-/**
- * Refer to the design of the Flink engine.
- * {@code ConfigOptions} are used to build a {@link ConfigOption}. The option 
is typically built in
- * one of the following pattern:
- *
- * <pre>{@code
- * // simple string-valued option with a default value
- * ConfigOption<String> tempDirs = ConfigOptions
- *     .key("tmp.dir")
- *     .stringType()
- *     .defaultValue("/tmp");
- *
- * // simple string-valued option with a default value and with the description
- * ConfigOption<String> tempDirs = ConfigOptions
- *     .key("tmp.dir")
- *     .description("this is a example of string")
- *     .stringType()
- *     .defaultValue("/tmp");
- *
- * // simple integer-valued option with a default value
- * ConfigOption<Integer> batchSize = ConfigOptions
- *     .key("batch.size")
- *     .intType()
- *     .defaultValue(100);
- *
- * // option with no default value
- * ConfigOption<String> userName = ConfigOptions
- *     .key("user.name")
- *     .stringType()
- *     .noDefaultValue();
- * }</pre>
- */
-public class ConfigOptions {
-
-    /**
-     * Starts building a new {@link ConfigOption}.
-     *
-     * @param key The key for the config option.
-     * @return The builder for the config option with the given key.
-     */
-    public static OptionBuilder key(String key) {
-        checkNotNull(key);
-        return new OptionBuilder(key);
-    }
-
-    // ------------------------------------------------------------------------
-
-    /**
-     * The option builder is used to create a {@link ConfigOption}. It is 
instantiated via {@link
-     * ConfigOptions#key(String)}.
-     */
-    public static final class OptionBuilder {
-
-        /** The key for the config option. */
-        private final String key;
-
-        private String description = ConfigOption.EMPTY_DESCRIPTION;
-
-        /**
-         * Creates a new OptionBuilder.
-         * @param key The key for the config option
-         */
-        OptionBuilder(String key) {
-            this.key = key;
-        }
-
-        OptionBuilder(String key, String description) {
-            this.key = key;
-            this.description = description;
-        }
-
-        public OptionBuilder description(String description) {
-            return new OptionBuilder(key, description);
-        }
-
-        /** Defines that the value of the option should be of {@link Boolean} 
type. */
-        public TypedConfigOptionBuilder<Boolean> booleanType() {
-            return new TypedConfigOptionBuilder<>(key, Boolean.class, 
description);
-        }
-
-        /** Defines that the value of the option should be of {@link Integer} 
type. */
-        public TypedConfigOptionBuilder<Integer> intType() {
-            return new TypedConfigOptionBuilder<>(key, Integer.class, 
description);
-        }
-
-        /** Defines that the value of the option should be of {@link Long} 
type. */
-        public TypedConfigOptionBuilder<Long> longType() {
-            return new TypedConfigOptionBuilder<>(key, Long.class, 
description);
-        }
-
-        /** Defines that the value of the option should be of {@link Float} 
type. */
-        public TypedConfigOptionBuilder<Float> floatType() {
-            return new TypedConfigOptionBuilder<>(key, Float.class, 
description);
-        }
-
-        /** Defines that the value of the option should be of {@link Double} 
type. */
-        public TypedConfigOptionBuilder<Double> doubleType() {
-            return new TypedConfigOptionBuilder<>(key, Double.class, 
description);
-        }
-
-        /** Defines that the value of the option should be of {@link String} 
type. */
-        public TypedConfigOptionBuilder<String> stringType() {
-            return new TypedConfigOptionBuilder<>(key, String.class, 
description);
-        }
-    }
-
-    /**
-     * Builder for {@link ConfigOption} with a defined atomic type.
-     *
-     * @param <T> atomic type of the option
-     */
-    public static class TypedConfigOptionBuilder<T> {
-        private final String key;
-        private final Class<T> clazz;
-
-        private final String description;
-
-        TypedConfigOptionBuilder(String key, Class<T> clazz, String 
description) {
-            this.key = key;
-            this.clazz = clazz;
-            this.description = description;
-        }
-
-        /**
-         * Creates a ConfigOption with the given default value.
-         *
-         * @param value The default value for the config option
-         * @return The config option with the default value.
-         */
-        public ConfigOption<T> defaultValue(T value) {
-            return new ConfigOption<>(key, clazz, value, description, null);
-        }
-
-        /**
-         * Creates a ConfigOption without a default value.
-         *
-         * @return The config option without a default value.
-         */
-        public ConfigOption<T> noDefaultValue() {
-            return new ConfigOption<>(key, clazz, null, description, null);
-        }
-
-        public ConfigOption<T> 
dynamicDefaultValue(Function<AuronConfiguration, T> 
dynamicDefaultValueFunction) {
-            return new ConfigOption<>(key, clazz, null, description, 
dynamicDefaultValueFunction);
-        }
-    }
-
-    // ------------------------------------------------------------------------
-
-    /** Not intended to be instantiated. */
-    private ConfigOptions() {}
-}
diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java 
b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
index 4d7edbf8..121970c2 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
@@ -23,10 +23,13 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.configuration.ConfigOption;
 import org.apache.auron.functions.AuronUDFWrapperContext;
 import org.apache.auron.hadoop.fs.FSDataInputWrapper;
 import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
 import org.apache.auron.memory.OnHeapSpillManager;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -106,4 +109,35 @@ public class JniBridge {
     public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
udfSerialized) {
         return 
AuronAdaptor.getInstance().getAuronUDFWrapperContext(udfSerialized);
     }
+
+    public static int intConf(String confKey) {
+        return getConfValue(confKey);
+    }
+
+    public static long longConf(String confKey) {
+        return getConfValue(confKey);
+    }
+
+    public static double doubleConf(String confKey) {
+        return getConfValue(confKey);
+    }
+
+    public static boolean booleanConf(String confKey) {
+        return getConfValue(confKey);
+    }
+
+    public static String stringConf(String confKey) {
+        return getConfValue(confKey);
+    }
+
+    static <T> T getConfValue(String confKey) {
+        Class<? extends AuronConfiguration> confClass =
+                AuronAdaptor.getInstance().getAuronConfiguration().getClass();
+        try {
+            ConfigOption<T> configOption = (ConfigOption<T>) 
FieldUtils.readStaticField(confClass, confKey);
+            return configOption.get();
+        } catch (IllegalAccessException | ClassCastException e) {
+            throw new RuntimeException("error reading conf value: " + confKey, 
e);
+        }
+    }
 }
diff --git 
a/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java
 
b/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java
deleted file mode 100644
index afee2f59..00000000
--- 
a/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * This is a test class for {@link AuronConfiguration}.
- */
-public class AuronConfigurationTest {
-
-    private MockAuronConfiguration config;
-
-    @BeforeEach
-    public void setUp() {
-        config = new MockAuronConfiguration();
-        
config.addConfig(MockAuronConfiguration.STRING_WITHOUT_DEFAULT_CONFIG_OPTION.key(),
 "str1");
-        config.addConfig(MockAuronConfiguration.INT_CONFIG_OPTION.key(), 100);
-        config.addConfig(MockAuronConfiguration.BOOLEAN_CONFIG_OPTION.key(), 
false);
-        config.addConfig(MockAuronConfiguration.DOUBLE_CONFIG_OPTION.key(), 
99.9);
-        config.addConfig(MockAuronConfiguration.LONG_CONFIG_OPTION.key(), 
10000000000L);
-        config.addConfig(MockAuronConfiguration.FLOAT_CONFIG_OPTION.key(), 
1.2f);
-    }
-
-    @Test
-    public void testGetConfig() {
-        assertEquals("str1", 
config.get(MockAuronConfiguration.STRING_WITHOUT_DEFAULT_CONFIG_OPTION));
-        assertEquals("zm", 
config.get(MockAuronConfiguration.STRING_CONFIG_OPTION));
-        assertEquals(100, 
config.getInteger(MockAuronConfiguration.INT_CONFIG_OPTION));
-        assertEquals(false, 
config.get(MockAuronConfiguration.BOOLEAN_CONFIG_OPTION));
-        assertEquals(99.9, 
config.get(MockAuronConfiguration.DOUBLE_CONFIG_OPTION), 0.0000000001);
-        assertEquals(10000000000L, 
config.getLong(MockAuronConfiguration.LONG_CONFIG_OPTION));
-        assertEquals(1.2f, 
config.get(MockAuronConfiguration.FLOAT_CONFIG_OPTION), 0.0000000001);
-        // test dynamic default value
-        assertEquals(500, 
config.getInteger(MockAuronConfiguration.INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION));
-    }
-}
diff --git 
a/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java 
b/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java
deleted file mode 100644
index 0242974a..00000000
--- 
a/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.Test;
-
-/** Tests for the {@link ConfigOption}. */
-public class ConfigOptionTest {
-
-    @Test
-    public void testConfigOption() {
-        ConfigOption<String> keyOption = 
ConfigOptions.key("key").stringType().noDefaultValue();
-        assertEquals("key", keyOption.key());
-        assertEquals(null, keyOption.defaultValue());
-        assertEquals(false, keyOption.hasDefaultValue());
-        ConfigOption<Boolean> booleanOption =
-                ConfigOptions.key("boolean").booleanType().defaultValue(true);
-        assertEquals(true, booleanOption.defaultValue());
-    }
-
-    @Test
-    public void testConfigOptionAddDesc() {
-        ConfigOption<String> keyOption = ConfigOptions.key("key")
-                .description("this is a description of the key")
-                .stringType()
-                .noDefaultValue();
-        assertEquals("key", keyOption.key());
-        assertEquals(null, keyOption.defaultValue());
-        assertEquals(false, keyOption.hasDefaultValue());
-        ConfigOption<Boolean> booleanOption =
-                ConfigOptions.key("boolean").booleanType().defaultValue(true);
-        assertEquals(true, booleanOption.defaultValue());
-        assertEquals("this is a description of the key", 
keyOption.description());
-    }
-}
diff --git 
a/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
 
b/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
index eb14c38c..ec6098cc 100644
--- 
a/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
+++ 
b/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
@@ -16,53 +16,51 @@
  */
 package org.apache.auron.configuration;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 
 public class MockAuronConfiguration extends AuronConfiguration {
 
-    public static final ConfigOption<String> STRING_CONFIG_OPTION =
-            ConfigOptions.key("string").stringType().defaultValue("zm");
+    // Basic configuration options with descriptions
+    public static final ConfigOption<String> STRING_CONFIG_OPTION = new 
ConfigOption<>(String.class)
+            .withKey("string")
+            .withDescription("A string configuration option for testing.")
+            .withDefaultValue("zm");
 
-    public static final ConfigOption<String> 
STRING_WITHOUT_DEFAULT_CONFIG_OPTION =
-            
ConfigOptions.key("string_without_default").stringType().noDefaultValue();
+    public static final ConfigOption<Integer> INT_CONFIG_OPTION = new 
ConfigOption<>(Integer.class)
+            .withKey("int")
+            .withDescription("An integer configuration option for testing.")
+            .withDefaultValue(1);
 
-    public static final ConfigOption<Integer> INT_CONFIG_OPTION =
-            ConfigOptions.key("int").intType().defaultValue(1);
+    public static final ConfigOption<Long> LONG_CONFIG_OPTION = new 
ConfigOption<>(Long.class)
+            .withKey("long")
+            .withDescription("A long configuration option for testing.")
+            .withDefaultValue(1L);
 
-    public static final ConfigOption<Long> LONG_CONFIG_OPTION =
-            ConfigOptions.key("long").longType().defaultValue(1L);
+    public static final ConfigOption<Boolean> BOOLEAN_CONFIG_OPTION = new 
ConfigOption<>(Boolean.class)
+            .withKey("boolean")
+            .withDescription("A boolean configuration option for testing.")
+            .withDefaultValue(true);
 
-    public static final ConfigOption<Boolean> BOOLEAN_CONFIG_OPTION =
-            ConfigOptions.key("boolean").booleanType().defaultValue(true);
+    public static final ConfigOption<Double> DOUBLE_CONFIG_OPTION = new 
ConfigOption<>(Double.class)
+            .withKey("double")
+            .withDescription("A double configuration option for testing.")
+            .withDefaultValue(1.0);
 
-    public static final ConfigOption<Double> DOUBLE_CONFIG_OPTION =
-            ConfigOptions.key("double").doubleType().defaultValue(1.0);
+    public static final ConfigOption<Float> FLOAT_CONFIG_OPTION = new 
ConfigOption<>(Float.class)
+            .withKey("float")
+            .withDescription("A float configuration option for testing.")
+            .withDefaultValue(1.0f);
 
-    public static final ConfigOption<Float> FLOAT_CONFIG_OPTION =
-            ConfigOptions.key("float").floatType().defaultValue(1.0f);
-
-    public static final ConfigOption<Integer> 
INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION = ConfigOptions.key(
-                    "int_with_dynamic_default")
-            .intType()
-            .dynamicDefaultValue(config -> 
config.getInteger(INT_CONFIG_OPTION) * 5);
-
-    private Map<String, Object> configMap = new HashMap<>();
+    public static final ConfigOption<Integer> 
INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION = new ConfigOption<>(Integer.class)
+            .withKey("int_with_dynamic_default")
+            .withDescription("An integer configuration option with dynamic 
default value.")
+            .withDynamicDefaultValue(
+                    config -> config.getOptional(INT_CONFIG_OPTION).orElse(1) 
* 5);
 
     public MockAuronConfiguration() {}
 
-    public void addConfig(String key, Object value) {
-        configMap.put(key, value);
-    }
-
     @Override
     public <T> Optional<T> getOptional(ConfigOption<T> option) {
-        return Optional.ofNullable((T) configMap.getOrDefault(option.key(), 
getOptionDefaultValue(option)));
-    }
-
-    @Override
-    public <T> Optional<T> getOptional(String key) {
-        return Optional.ofNullable((T) configMap.get(key));
+        return Optional.empty(); // always use default value
     }
 }
diff --git a/native-engine/auron-jni-bridge/src/conf.rs 
b/native-engine/auron-jni-bridge/src/conf.rs
index 383596d6..351432f9 100644
--- a/native-engine/auron-jni-bridge/src/conf.rs
+++ b/native-engine/auron-jni-bridge/src/conf.rs
@@ -67,7 +67,7 @@ pub trait BooleanConf {
     fn value(&self) -> Result<bool> {
         ensure_jni_bridge_inited()?;
         let key = jni_new_string!(self.key())?;
-        jni_call_static!(AuronConf.booleanConf(key.as_obj()) -> bool)
+        jni_call_static!(JniBridge.booleanConf(key.as_obj()) -> bool)
     }
 }
 
@@ -76,7 +76,7 @@ pub trait IntConf {
     fn value(&self) -> Result<i32> {
         ensure_jni_bridge_inited()?;
         let key = jni_new_string!(self.key())?;
-        jni_call_static!(AuronConf.intConf(key.as_obj()) -> i32)
+        jni_call_static!(JniBridge.intConf(key.as_obj()) -> i32)
     }
 }
 
@@ -85,7 +85,7 @@ pub trait LongConf {
     fn value(&self) -> Result<i64> {
         ensure_jni_bridge_inited()?;
         let key = jni_new_string!(self.key())?;
-        jni_call_static!(AuronConf.longConf(key.as_obj()) -> i64)
+        jni_call_static!(JniBridge.longConf(key.as_obj()) -> i64)
     }
 }
 
@@ -94,7 +94,7 @@ pub trait DoubleConf {
     fn value(&self) -> Result<f64> {
         ensure_jni_bridge_inited()?;
         let key = jni_new_string!(self.key())?;
-        jni_call_static!(AuronConf.doubleConf(key.as_obj()) -> f64)
+        jni_call_static!(JniBridge.doubleConf(key.as_obj()) -> f64)
     }
 }
 
@@ -104,7 +104,7 @@ pub trait StringConf {
         ensure_jni_bridge_inited()?;
         let key = jni_new_string!(self.key())?;
         let value = jni_get_string!(
-            jni_call_static!(AuronConf.stringConf(key.as_obj()) -> JObject)?
+            jni_call_static!(JniBridge.stringConf(key.as_obj()) -> JObject)?
                 .as_obj()
                 .into()
         )?;
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs 
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index 5b76d829..f058901a 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -447,7 +447,6 @@ pub struct JavaClasses<'a> {
     pub cSparkUDAFWrapperContext: SparkUDAFWrapperContext<'a>,
     pub cSparkUDTFWrapperContext: SparkUDTFWrapperContext<'a>,
     pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
-    pub cAuronConf: AuronConf<'a>,
     pub cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase<'a>,
     pub cAuronCallNativeWrapper: AuronCallNativeWrapper<'a>,
     pub cAuronOnHeapSpillManager: AuronOnHeapSpillManager<'a>,
@@ -513,7 +512,6 @@ impl JavaClasses<'static> {
                 cSparkUDAFWrapperContext: SparkUDAFWrapperContext::new(env)?,
                 cSparkUDTFWrapperContext: SparkUDTFWrapperContext::new(env)?,
                 cSparkUDAFMemTracker: SparkUDAFMemTracker::new(env)?,
-                cAuronConf: AuronConf::new(env)?,
                 cAuronRssPartitionWriterBase: 
AuronRssPartitionWriterBase::new(env)?,
                 cAuronCallNativeWrapper: AuronCallNativeWrapper::new(env)?,
                 cAuronOnHeapSpillManager: AuronOnHeapSpillManager::new(env)?,
@@ -577,9 +575,18 @@ pub struct JniBridge<'a> {
     pub method_getTotalMemoryLimited_ret: ReturnType,
     pub method_getDirectWriteSpillToDiskFile: JStaticMethodID,
     pub method_getDirectWriteSpillToDiskFile_ret: ReturnType,
-
     pub method_getAuronUDFWrapperContext: JStaticMethodID,
     pub method_getAuronUDFWrapperContext_ret: ReturnType,
+    pub method_intConf: JStaticMethodID,
+    pub method_intConf_ret: ReturnType,
+    pub method_longConf: JStaticMethodID,
+    pub method_longConf_ret: ReturnType,
+    pub method_doubleConf: JStaticMethodID,
+    pub method_doubleConf_ret: ReturnType,
+    pub method_booleanConf: JStaticMethodID,
+    pub method_booleanConf_ret: ReturnType,
+    pub method_stringConf: JStaticMethodID,
+    pub method_stringConf_ret: ReturnType,
 }
 impl<'a> JniBridge<'a> {
     pub const SIG_TYPE: &'static str = "org/apache/auron/jni/JniBridge";
@@ -663,6 +670,36 @@ impl<'a> JniBridge<'a> {
                 
"(Ljava/nio/ByteBuffer;)Lorg/apache/auron/functions/AuronUDFWrapperContext;",
             )?,
             method_getAuronUDFWrapperContext_ret: ReturnType::Object,
+            method_intConf: env.get_static_method_id(
+                class,
+                "intConf",
+                "(Ljava/lang/String;)I",
+            )?,
+            method_intConf_ret: ReturnType::Primitive(Primitive::Int),
+            method_longConf: env.get_static_method_id(
+                class,
+                "longConf",
+                "(Ljava/lang/String;)J",
+            )?,
+            method_longConf_ret: ReturnType::Primitive(Primitive::Long),
+            method_doubleConf: env.get_static_method_id(
+                class,
+                "doubleConf",
+                "(Ljava/lang/String;)D",
+            )?,
+            method_doubleConf_ret: ReturnType::Primitive(Primitive::Double),
+            method_booleanConf: env.get_static_method_id(
+                class,
+                "booleanConf",
+                "(Ljava/lang/String;)Z",
+            )?,
+            method_booleanConf_ret: ReturnType::Primitive(Primitive::Boolean),
+            method_stringConf: env.get_static_method_id(
+                class,
+                "stringConf",
+                "(Ljava/lang/String;)Ljava/lang/String;",
+            )?,
+            method_stringConf_ret: ReturnType::Object,
         })
     }
 }
@@ -1121,58 +1158,6 @@ impl<'a> SparkMetricNode<'a> {
     }
 }
 
-#[allow(non_snake_case)]
-pub struct AuronConf<'a> {
-    pub class: JClass<'a>,
-    pub method_booleanConf: JStaticMethodID,
-    pub method_booleanConf_ret: ReturnType,
-    pub method_intConf: JStaticMethodID,
-    pub method_intConf_ret: ReturnType,
-    pub method_longConf: JStaticMethodID,
-    pub method_longConf_ret: ReturnType,
-    pub method_doubleConf: JStaticMethodID,
-    pub method_doubleConf_ret: ReturnType,
-    pub method_stringConf: JStaticMethodID,
-    pub method_stringConf_ret: ReturnType,
-}
-
-impl<'a> AuronConf<'_> {
-    pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/AuronConf";
-
-    pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronConf<'a>> {
-        let class = get_global_jclass(env, Self::SIG_TYPE)?;
-        Ok(AuronConf {
-            class,
-            method_booleanConf: env.get_static_method_id(
-                class,
-                "booleanConf",
-                "(Ljava/lang/String;)Z",
-            )?,
-            method_booleanConf_ret: ReturnType::Primitive(Primitive::Boolean),
-            method_intConf: env.get_static_method_id(class, "intConf", 
"(Ljava/lang/String;)I")?,
-            method_intConf_ret: ReturnType::Primitive(Primitive::Int),
-            method_longConf: env.get_static_method_id(
-                class,
-                "longConf",
-                "(Ljava/lang/String;)J",
-            )?,
-            method_longConf_ret: ReturnType::Primitive(Primitive::Long),
-            method_doubleConf: env.get_static_method_id(
-                class,
-                "doubleConf",
-                "(Ljava/lang/String;)D",
-            )?,
-            method_doubleConf_ret: ReturnType::Primitive(Primitive::Double),
-            method_stringConf: env.get_static_method_id(
-                class,
-                "stringConf",
-                "(Ljava/lang/String;)Ljava/lang/String;",
-            )?,
-            method_stringConf_ret: ReturnType::Object,
-        })
-    }
-}
-
 #[allow(non_snake_case)]
 pub struct AuronRssPartitionWriterBase<'a> {
     pub class: JClass<'a>,
diff --git 
a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
 
b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
index 8d7d0dd5..3cc6cbd1 100644
--- 
a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
+++ 
b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
@@ -18,6 +18,7 @@ package org.apache.spark.sql.auron;
 
 import net.bytebuddy.implementation.bind.annotation.Argument;
 import net.bytebuddy.implementation.bind.annotation.RuntimeType;
+import org.apache.auron.spark.configuration.SparkAuronConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,6 +28,6 @@ public class ForceApplyShuffledHashJoinInterceptor {
     @RuntimeType
     public static Object intercept(@Argument(0) Object conf) {
         logger.debug("calling JoinSelectionHelper.forceApplyShuffledHashJoin() 
intercepted by auron");
-        return AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf();
+        return SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get();
     }
 }
diff --git 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index c3a1861d..6353d7cb 100644
--- 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++ 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -107,6 +107,7 @@ import org.apache.spark.storage.FileSegment
 import org.apache.auron.{protobuf => pb, sparkver}
 import org.apache.auron.common.AuronBuildInfo
 import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 import org.apache.auron.spark.ui.AuronBuildInfoEvent
 
 class ShimsImpl extends Shims with Logging {
@@ -128,7 +129,7 @@ class ShimsImpl extends Shims with Logging {
   override def initExtension(): Unit = {
     ValidateSparkPlanInjector.inject()
 
-    if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) {
+    if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) {
       ForceApplyShuffledHashJoinInjector.inject()
     }
 
@@ -141,19 +142,18 @@ class ShimsImpl extends Shims with Logging {
 
   @sparkver("3.0 / 3.1")
   override def initExtension(): Unit = {
-    if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) {
-      logWarning(s"${AuronConf.FORCE_SHUFFLED_HASH_JOIN.key} is not supported 
in $shimVersion")
+    if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) {
+      logWarning(
+        s"${SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.key} is not 
supported in $shimVersion")
     }
 
   }
 
   // set Auron spark ui if spark.auron.ui.enabled is true
   override def onApplyingExtension(): Unit = {
-    logInfo(
-      " onApplyingExtension get ui_enabled : " + SparkEnv.get.conf
-        .get(AuronConf.UI_ENABLED.key, "true"))
+    logInfo(s"onApplyingExtension get ui_enabled: 
${SparkAuronConfiguration.UI_ENABLED.get()}")
 
-    if (SparkEnv.get.conf.get(AuronConf.UI_ENABLED.key, 
"true").equals("true")) {
+    if (SparkAuronConfiguration.UI_ENABLED.get()) {
       val sparkContext = SparkContext.getActive.getOrElse {
         throw new IllegalStateException("No active spark context found that 
should not happen")
       }
diff --git 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
index 3e990978..e82eb78f 100644
--- 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
+++ 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
@@ -17,9 +17,9 @@
 package org.apache.auron
 
 import org.apache.spark.sql.{AuronQueryTest, Row}
-import org.apache.spark.sql.auron.AuronConf
 import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec
 
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 import org.apache.auron.util.AuronTestUtils
 
 class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with 
AuronSQLTestHelper {
@@ -197,7 +197,7 @@ class AuronQuerySuite extends AuronQueryTest with 
BaseAuronSQLSuite with AuronSQ
     if (AuronTestUtils.isSparkV32OrGreater) {
       Seq(true, false).foreach { forcePositionalEvolution =>
         withEnvConf(
-          AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key -> 
forcePositionalEvolution.toString) {
+          SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key -> 
forcePositionalEvolution.toString) {
           withTempPath { f =>
             val path = f.getCanonicalPath
             Seq[(Integer, Integer)]((1, 2), (3, 4), (5, 6), (null, null))
@@ -220,7 +220,7 @@ class AuronQuerySuite extends AuronQueryTest with 
BaseAuronSQLSuite with AuronSQ
     if (AuronTestUtils.isSparkV32OrGreater) {
       Seq(true, false).foreach { forcePositionalEvolution =>
         withEnvConf(
-          AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key -> 
forcePositionalEvolution.toString) {
+          SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key -> 
forcePositionalEvolution.toString) {
           withTempPath { f =>
             val path = f.getCanonicalPath
             Seq[(Integer, Integer, Integer)]((1, 2, 1), (3, 4, 2), (5, 6, 3), 
(null, null, 4))
diff --git 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
index dc96731c..8bc93190 100644
--- 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
+++ 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.auron
 
 import org.apache.spark.sql.AuronQueryTest
-import org.apache.spark.sql.auron.{AuronConf, NativeConverters}
+import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, 
StringType}
 
@@ -51,25 +51,25 @@ class NativeConvertersSuite
   }
 
   test("cast from string to numeric adds trim wrapper before native cast when 
enabled") {
-    withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") {
+    withSQLConf("spark.auron.cast.trimString" -> "true") {
       assertTrimmedCast(" 42 ", IntegerType)
     }
   }
 
   test("cast from string to boolean adds trim wrapper before native cast when 
enabled") {
-    withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") {
+    withSQLConf("spark.auron.cast.trimString" -> "true") {
       assertTrimmedCast(" true ", BooleanType)
     }
   }
 
   test("cast trim disabled via auron conf") {
-    withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
+    withSQLConf("spark.auron.cast.trimString" -> "false") {
       assertNonTrimmedCast(" 42 ", IntegerType)
     }
   }
 
   test("cast trim disabled via auron conf for boolean cast") {
-    withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
+    withSQLConf("spark.auron.cast.trimString" -> "false") {
       assertNonTrimmedCast(" true ", BooleanType)
     }
   }
diff --git 
a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java 
b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
index c88f8457..005e9753 100644
--- a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
+++ b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
@@ -28,7 +28,6 @@ import org.apache.auron.memory.OnHeapSpillManager;
 import org.apache.auron.spark.configuration.SparkAuronConfiguration;
 import org.apache.auron.spark.sql.SparkAuronUDFWrapperContext;
 import org.apache.spark.SparkEnv;
-import org.apache.spark.SparkEnv$;
 import org.apache.spark.TaskContext;
 import org.apache.spark.TaskContext$;
 import org.apache.spark.sql.auron.NativeHelper$;
@@ -88,7 +87,7 @@ public class SparkAuronAdaptor extends AuronAdaptor {
 
     @Override
     public AuronConfiguration getAuronConfiguration() {
-        return new SparkAuronConfiguration(SparkEnv$.MODULE$.get().conf());
+        return new SparkAuronConfiguration();
     }
 
     @Override
diff --git 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index 8e66efb1..14ee644e 100644
--- 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++ 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -16,15 +16,16 @@
  */
 package org.apache.auron.spark.configuration;
 
-import static org.apache.auron.util.Preconditions.checkNotNull;
-
+import java.util.List;
 import java.util.Optional;
+import java.util.function.Supplier;
 import org.apache.auron.configuration.AuronConfiguration;
 import org.apache.auron.configuration.ConfigOption;
-import org.apache.auron.configuration.ConfigOptions;
-import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkEnv;
 import org.apache.spark.internal.config.ConfigEntry;
-import org.apache.spark.internal.config.ConfigEntryWithDefault;
+import org.apache.spark.internal.config.ConfigEntryWithDefaultFunction;
+import org.apache.spark.sql.internal.SQLConf;
 import scala.Option;
 import scala.collection.immutable.List$;
 
@@ -38,283 +39,586 @@ public class SparkAuronConfiguration extends 
AuronConfiguration {
     // please manually add the prefix.
     public static final String SPARK_PREFIX = "spark.";
 
-    public static final ConfigOption<Boolean> UI_ENABLED = 
ConfigOptions.key("auron.ui.enabled")
-            .description("support spark.auron.ui.enabled.")
-            .booleanType()
-            .defaultValue(true);
-
-    public static final ConfigOption<Double> PROCESS_MEMORY_FRACTION = 
ConfigOptions.key(
-                    "auron.process.vmrss.memoryFraction")
-            .description("suggested fraction of process total memory (on-heap 
and off-heap). "
-                    + "this limit is for process's resident memory usage.")
-            .doubleType()
-            .defaultValue(0.9);
-
-    public static final ConfigOption<Boolean> CASE_CONVERT_FUNCTIONS_ENABLE = 
ConfigOptions.key(
-                    "auron.enable.caseconvert.functions")
-            .description("enable converting upper/lower functions to native, 
special cases may provide different, "
-                    + "outputs from spark due to different unicode versions. ")
-            .booleanType()
-            .defaultValue(true);
-
-    public static final ConfigOption<Boolean> INPUT_BATCH_STATISTICS_ENABLE = 
ConfigOptions.key(
-                    "auron.enableInputBatchStatistics")
-            .description("enable extra metrics of input batch statistics. ")
-            .booleanType()
-            .defaultValue(true);
-
-    public static final ConfigOption<Boolean> UDAF_FALLBACK_ENABLE = 
ConfigOptions.key("auron.udafFallback.enable")
-            .description("supports UDAF and other aggregate functions not 
implemented. ")
-            .booleanType()
-            .defaultValue(true);
-
-    public static final ConfigOption<Integer> SUGGESTED_UDAF_ROW_MEM_USAGE = 
ConfigOptions.key(
-                    "auron.suggested.udaf.memUsedSize")
-            .description("TypedImperativeAggregate one row mem use size. ")
-            .intType()
-            .defaultValue(64);
-
-    public static final ConfigOption<Integer> 
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG = ConfigOptions.key(
-                    "auron.udafFallback.num.udafs.trigger.sortAgg")
-            .description(
-                    "number of udafs to trigger sort-based aggregation, by 
default, all aggs containing udafs are converted to sort-based.")
-            .intType()
-            .defaultValue(1);
-
-    public static final ConfigOption<Integer> UDAF_FALLBACK_ESTIM_ROW_SIZE = 
ConfigOptions.key(
-                    "auron.udafFallback.typedImperativeEstimatedRowSize")
-            .description("TypedImperativeAggregate one row mem use size.")
-            .intType()
-            .defaultValue(256);
-
-    public static final ConfigOption<Boolean> CAST_STRING_TRIM_ENABLE = 
ConfigOptions.key("auron.cast.trimString")
-            .description("enable trimming string inputs before casting to 
numeric/boolean types. ")
-            .booleanType()
-            .defaultValue(true);
-
-    public static final ConfigOption<Boolean> IGNORE_CORRUPTED_FILES = 
ConfigOptions.key("files.ignoreCorruptFiles")
-            .description("ignore corrupted input files. ")
-            .booleanType()
-            .defaultValue(false);
-
-    public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_ENABLE = 
ConfigOptions.key(
-                    "auron.partialAggSkipping.enable")
-            .description("enable partial aggregate skipping (see 
https://github.com/apache/auron/issues/327). ")
-            .booleanType()
-            .defaultValue(true);
-
-    public static final ConfigOption<Double> PARTIAL_AGG_SKIPPING_RATIO = 
ConfigOptions.key(
-                    "auron.partialAggSkipping.ratio")
-            .description("partial aggregate skipping ratio. ")
-            .doubleType()
-            .defaultValue(0.9);
-
-    public static final ConfigOption<Integer> PARTIAL_AGG_SKIPPING_MIN_ROWS = 
ConfigOptions.key(
-                    "auron.partialAggSkipping.minRows")
-            .description("minimum number of rows to trigger partial aggregate 
skipping.")
-            .intType()
-            .dynamicDefaultValue(
+    public static final ConfigOption<Boolean> AURON_ENABLED = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enabled")
+            .addAltKey("auron.enable")
+            .withCategory("Runtime Configuration")
+            .withDescription("Enable Spark Auron support to accelerate query 
execution with native implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> UI_ENABLED = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.ui.enabled")
+            .addAltKey("auron.ui.enable")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Enable Spark Auron UI support to display Auron-specific 
metrics and statistics in Spark UI.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Double> PROCESS_MEMORY_FRACTION = new 
ConfigOption<>(Double.class)
+            .withKey("auron.process.vmrss.memoryFraction")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Suggested fraction of process total memory (on-heap and 
off-heap) to use for resident memory. "
+                            + "This controls the memory limit for the 
process's virtual memory resident set size (VMRSS).")
+            .withDefaultValue(0.9);
+
+    public static final ConfigOption<Boolean> CASE_CONVERT_FUNCTIONS_ENABLE = 
new ConfigOption<>(Boolean.class)
+            .withKey("auron.enable.caseconvert.functions")
+            .withCategory("Expression/Function Supports")
+            .withDescription(
+                    "Enable converting UPPER/LOWER string functions to native 
implementations for better performance. "
+                            + "Note: May produce different outputs from Spark 
in special cases due to different Unicode versions.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> INPUT_BATCH_STATISTICS_ENABLE = 
new ConfigOption<>(Boolean.class)
+            .withKey("auron.enableInputBatchStatistics")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Enable collection of additional metrics for input batch 
statistics to monitor data processing performance.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> UDAF_FALLBACK_ENABLE = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.udafFallback.enable")
+            .withCategory("UDAF Fallback")
+            .withDescription(
+                    "Enable fallback support for UDAF and other aggregate 
functions that are not implemented in Auron, "
+                            + "allowing them to be executed using Spark's 
native implementation.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Integer> SUGGESTED_UDAF_ROW_MEM_USAGE = 
new ConfigOption<>(Integer.class)
+            .withKey("auron.suggested.udaf.memUsedSize")
+            .withCategory("UDAF Fallback")
+            .withDescription("Suggested memory usage size per row for 
TypedImperativeAggregate functions in bytes. "
+                    + "This helps in memory allocation planning for UDAF 
operations.")
+            .withDefaultValue(64);
+
+    public static final ConfigOption<Integer> 
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG = new ConfigOption<>(
+                    Integer.class)
+            .withKey("auron.udafFallback.num.udafs.trigger.sortAgg")
+            .withCategory("UDAF Fallback")
+            .withDescription(
+                    "Number of UDAFs to trigger sort-based aggregation, by 
default, all aggs containing udafs are converted to sort-based.")
+            .withDefaultValue(1);
+
+    public static final ConfigOption<Integer> UDAF_FALLBACK_ESTIM_ROW_SIZE = 
new ConfigOption<>(Integer.class)
+            .withKey("auron.udafFallback.typedImperativeEstimatedRowSize")
+            .withCategory("UDAF Fallback")
+            .withDescription("Estimated memory size per row for 
TypedImperativeAggregate functions in bytes. "
+                    + "This estimation is used for memory planning and 
allocation during UDAF fallback operations.")
+            .withDefaultValue(256);
+
+    public static final ConfigOption<Boolean> CAST_STRING_TRIM_ENABLE = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.cast.trimString")
+            .withCategory("Expression/Function Supports")
+            .withDescription(
+                    "Enable automatic trimming of whitespace from string 
inputs before casting to numeric or boolean types. "
+                            + "This helps prevent casting errors due to 
leading/trailing whitespace.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> IGNORE_CORRUPTED_FILES = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.files.ignoreCorruptFiles")
+            .withCategory("Data Sources")
+            .withDescription("Ignore corrupted input files, defaults to 
spark.sql.files.ignoreCorruptFiles")
+            .withDynamicDefaultValue(
+                    conf -> 
SparkEnv.get().conf().getBoolean("spark.sql.files.ignoreCorruptFiles", false));
+
+    public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_ENABLE = 
new ConfigOption<>(Boolean.class)
+            .withKey("auron.partialAggSkipping.enable")
+            .withCategory("Partial Aggregate Skipping")
+            .withDescription(
+                    "Enable partial aggregate skipping optimization to improve 
performance by skipping unnecessary "
+                            + "partial aggregation stages when certain 
conditions are met. See issue #327 for detailed implementation.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Double> PARTIAL_AGG_SKIPPING_RATIO = new 
ConfigOption<>(Double.class)
+            .withKey("auron.partialAggSkipping.ratio")
+            .withCategory("Partial Aggregate Skipping")
+            .withDescription(
+                    "Threshold ratio for partial aggregate skipping 
optimization. When the ratio of unique keys to total rows "
+                            + "exceeds this value, partial aggregation may be 
skipped to improve performance.")
+            .withDefaultValue(0.9);
+
+    public static final ConfigOption<Integer> PARTIAL_AGG_SKIPPING_MIN_ROWS = 
new ConfigOption<>(Integer.class)
+            .withKey("auron.partialAggSkipping.minRows")
+            .withCategory("Partial Aggregate Skipping")
+            .withDescription("Minimum number of rows required to trigger 
partial aggregate skipping optimization. "
+                    + "This prevents the optimization from being applied to 
very small datasets where it may not be beneficial. "
+                    + "Defaults to spark.auron.batchSize * 5")
+            .withDynamicDefaultValue(
                     config -> 
config.getOptional(AuronConfiguration.BATCH_SIZE).get() * 5);
 
-    public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_SKIP_SPILL 
= ConfigOptions.key(
-                    "auron.partialAggSkipping.skipSpill")
-            .description("always skip partial aggregate when triggered 
spilling. ")
-            .booleanType()
-            .defaultValue(false);
-
-    public static final ConfigOption<Boolean> PARQUET_ENABLE_PAGE_FILTERING = 
ConfigOptions.key(
-                    "auron.parquet.enable.pageFiltering")
-            .description("parquet enable page filtering. ")
-            .booleanType()
-            .defaultValue(false);
-
-    public static final ConfigOption<Boolean> PARQUET_ENABLE_BLOOM_FILTER = 
ConfigOptions.key(
-                    "auron.parquet.enable.bloomFilter")
-            .description("parquet enable bloom filter. ")
-            .booleanType()
-            .defaultValue(false);
-
-    public static final ConfigOption<Integer> PARQUET_MAX_OVER_READ_SIZE = 
ConfigOptions.key(
-                    "auron.parquet.maxOverReadSize")
-            .description("parquet max over read size.")
-            .intType()
-            .defaultValue(16384);
-
-    public static final ConfigOption<Integer> PARQUET_METADATA_CACHE_SIZE = 
ConfigOptions.key(
-                    "auron.parquet.metadataCacheSize")
-            .description("parquet metadata cache size.")
-            .intType()
-            .defaultValue(5);
-
-    public static final ConfigOption<String> SPARK_IO_COMPRESSION_CODEC = 
ConfigOptions.key("io.compression.codec")
-            .description("spark io compression codec.")
-            .stringType()
-            .defaultValue("lz4");
-
-    public static final ConfigOption<Integer> SPARK_IO_COMPRESSION_ZSTD_LEVEL 
= ConfigOptions.key(
-                    "io.compression.zstd.level")
-            .description("spark io compression zstd level.")
-            .intType()
-            .defaultValue(1);
-
-    public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU = 
ConfigOptions.key(
-                    "auron.tokio.worker.threads.per.cpu")
-            .description("tokio worker threads per cpu (spark.task.cpus), 0 
for auto detection.")
-            .intType()
-            .defaultValue(0);
-
-    public static final ConfigOption<Integer> SPARK_TASK_CPUS = 
ConfigOptions.key("task.cpus")
-            .description("number of cpus per task.")
-            .intType()
-            .defaultValue(1);
-
-    public static final ConfigOption<Boolean> FORCE_SHUFFLED_HASH_JOIN = 
ConfigOptions.key(
-                    "auron.forceShuffledHashJoin")
-            .description("replace all sort-merge join to shuffled-hash join, 
only used for benchmarking. ")
-            .booleanType()
-            .defaultValue(false);
-
-    public static final ConfigOption<Integer> 
SHUFFLE_COMPRESSION_TARGET_BUF_SIZE = ConfigOptions.key(
-                    "auron.shuffle.compression.targetBufSize")
-            .description("shuffle compression target buffer size, default is 
4MB.")
-            .intType()
-            .defaultValue(4194304);
-
-    public static final ConfigOption<String> SPILL_COMPRESSION_CODEC = 
ConfigOptions.key(
-                    "auron.spill.compression.codec")
-            .description("spark spill compression codec.")
-            .stringType()
-            .defaultValue("lz4");
-
-    public static final ConfigOption<Boolean> SMJ_FALLBACK_ENABLE = 
ConfigOptions.key("auron.smjfallback.enable")
-            .description("enable hash join falling back to sort merge join 
when hash table is too big. ")
-            .booleanType()
-            .defaultValue(false);
-
-    public static final ConfigOption<Integer> SMJ_FALLBACK_ROWS_THRESHOLD = 
ConfigOptions.key(
-                    "auron.smjfallback.rows.threshold")
-            .description("smj fallback threshold.")
-            .intType()
-            .defaultValue(10000000);
-
-    public static final ConfigOption<Integer> SMJ_FALLBACK_MEM_SIZE_THRESHOLD 
= ConfigOptions.key(
-                    "auron.smjfallback.mem.threshold")
-            .description("smj fallback mem threshold.")
-            .intType()
-            .defaultValue(134217728);
-
-    public static final ConfigOption<Double> ON_HEAP_SPILL_MEM_FRACTION = 
ConfigOptions.key(
-                    "auron.onHeapSpill.memoryFraction")
-            .description("max memory fraction of on-heap spills. ")
-            .doubleType()
-            .defaultValue(0.9);
-
-    public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = 
ConfigOptions.key(
-                    "auron.suggested.batch.memSize")
-            .description("suggested memory size for record batch.")
-            .intType()
-            .defaultValue(8388608);
-
-    public static final ConfigOption<Boolean> PARSE_JSON_ERROR_FALLBACK = 
ConfigOptions.key(
-                    "auron.parseJsonError.fallback")
-            .description("fallback to UDFJson when error parsing json in 
native implementation. ")
-            .booleanType()
-            .defaultValue(true);
-
-    public static final ConfigOption<Integer> 
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE = ConfigOptions.key(
-                    "auron.suggested.batch.memSize.multiwayMerging")
-            .description("suggested memory size for k-way merging use smaller 
batch memory size for "
-                    + "k-way merging since there will be multiple batches in 
memory at the same time.")
-            .intType()
-            .defaultValue(1048576);
-    public static final ConfigOption<Boolean> ORC_FORCE_POSITIONAL_EVOLUTION = 
ConfigOptions.key(
-                    "auron.orc.force.positional.evolution")
-            .description("orc force positional evolution. ")
-            .booleanType()
-            .defaultValue(false);
-    public static final ConfigOption<Boolean> ORC_TIMESTAMP_USE_MICROSECOND = 
ConfigOptions.key(
-                    "auron.orc.timestamp.use.microsecond")
-            .description("use microsecond precision when reading ORC timestamp 
columns. ")
-            .booleanType()
-            .defaultValue(false);
-    public static final ConfigOption<Boolean> ORC_SCHEMA_CASE_SENSITIVE = 
ConfigOptions.key(
-                    "auron.orc.schema.caseSensitive.enable")
-            .description("whether ORC file schema matching distinguishes 
between uppercase and lowercase. ")
-            .booleanType()
-            .defaultValue(false);
-
-    public static final ConfigOption<Boolean> FORCE_SHORT_CIRCUIT_AND_OR = 
ConfigOptions.key(
-                    "auron.forceShortCircuitAndOr")
-            .description("force using short-circuit evaluation 
(PhysicalSCAndExprNode/PhysicalSCOrExprNode) "
-                    + "for And/Or expressions, regardless of whether rhs 
contains HiveUDF. ")
-            .booleanType()
-            .defaultValue(false);
-
-    private final SparkConf sparkConf;
-
-    public SparkAuronConfiguration(SparkConf conf) {
-        this.sparkConf = checkNotNull(conf, "spark conf cannot be null");
-    }
+    public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_SKIP_SPILL 
= new ConfigOption<>(Boolean.class)
+            .withKey("auron.partialAggSkipping.skipSpill")
+            .withCategory("Partial Aggregate Skipping")
+            .withDescription("Always skip partial aggregation when spilling is 
triggered to prevent memory pressure. "
+                    + "When enabled, the system will bypass partial 
aggregation stages if memory spilling occurs, "
+                    + "potentially trading off some optimization for memory 
stability.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> PARQUET_ENABLE_PAGE_FILTERING = 
new ConfigOption<>(Boolean.class)
+            .withKey("auron.parquet.enable.pageFiltering")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Enable Parquet page-level filtering to skip reading 
unnecessary data pages during query execution. "
+                            + "This optimization can significantly improve 
read performance by avoiding I/O for pages that don't match filter predicates.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> PARQUET_ENABLE_BLOOM_FILTER = 
new ConfigOption<>(Boolean.class)
+            .withKey("auron.parquet.enable.bloomFilter")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Enable Parquet bloom filter support for efficient 
equality predicate filtering. "
+                            + "Bloom filters can quickly determine if a value 
might exist in a data block, reducing unnecessary I/O operations.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Integer> PARQUET_MAX_OVER_READ_SIZE = new 
ConfigOption<>(Integer.class)
+            .withKey("auron.parquet.maxOverReadSize")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Maximum over-read size in bytes for Parquet file 
operations. This controls how much extra data "
+                            + "can be read beyond the required data to 
optimize I/O operations and improve read performance.")
+            .withDefaultValue(16384);
+
+    public static final ConfigOption<Integer> PARQUET_METADATA_CACHE_SIZE = 
new ConfigOption<>(Integer.class)
+            .withKey("auron.parquet.metadataCacheSize")
+            .withCategory("Data Sources")
+            .withDescription("Size of the Parquet metadata cache in number of 
entries. This cache stores file metadata "
+                    + "to avoid repeated metadata reads and improve query 
performance for frequently accessed files.")
+            .withDefaultValue(5);
+
+    public static final ConfigOption<String> SPARK_IO_COMPRESSION_CODEC = new 
ConfigOption<>(String.class)
+            .withKey("io.compression.codec")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Compression codec used for Spark I/O operations. Common 
options include lz4, snappy, gzip, and zstd. "
+                            + "The choice of codec affects both compression 
ratio and decompression speed.")
+            .withDynamicDefaultValue(_conf -> 
SparkEnv.get().conf().get("spark.io.compression.codec", "lz4"));
+
+    public static final ConfigOption<Integer> SPARK_IO_COMPRESSION_ZSTD_LEVEL 
= new ConfigOption<>(Integer.class)
+            .withKey("io.compression.zstd.level")
+            .withCategory("Runtime Configuration")
+            .withDescription("Compression level for Zstandard (zstd) 
compression codec used in Spark I/O operations. "
+                    + "Valid values range from 1 (fastest) to 22 (highest 
compression). Higher levels provide better compression "
+                    + "but require more CPU time and memory.")
+            .withDynamicDefaultValue(_conf -> 
SparkEnv.get().conf().getInt("spark.io.compression.zstd.level", 1));
+
+    public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU = 
new ConfigOption<>(Integer.class)
+            .withKey("auron.tokio.worker.threads.per.cpu")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Number of Tokio worker threads to create per CPU core 
(spark.task.cpus). Set to 0 for automatic detection "
+                            + "based on available CPU cores. This setting 
controls the thread pool size for Tokio-based asynchronous operations.")
+            .withDefaultValue(0);
+
+    public static final ConfigOption<Integer> SPARK_TASK_CPUS = new 
ConfigOption<>(Integer.class)
+            .withKey("task.cpus")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Number of CPU cores allocated per Spark task. This 
setting determines the parallelism level "
+                            + "for individual tasks and affects resource 
allocation and task scheduling. "
+                            + "Defaults to spark.task.cpus.")
+            .withDynamicDefaultValue(_conf -> 
SparkEnv.get().conf().getInt("spark.task.cpus", 1));
+
+    public static final ConfigOption<Boolean> FORCE_SHUFFLED_HASH_JOIN = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.forceShuffledHashJoin")
+            .withCategory("Operator Supports")
+            .withDescription(
+                    "Force replacement of all sort-merge joins with 
shuffled-hash joins for performance comparison and benchmarking. "
+                            + "This setting is primarily used for testing and 
performance analysis, as different join strategies may be optimal "
+                            + "for different data distributions and query 
patterns.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Integer> 
SHUFFLE_COMPRESSION_TARGET_BUF_SIZE = new ConfigOption<>(Integer.class)
+            .withKey("auron.shuffle.compression.targetBufSize")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Target buffer size in bytes for shuffle compression 
operations. This setting controls the buffer size "
+                            + "used during shuffle data compression, affecting 
both compression efficiency and memory usage. Default is 4MB (4,194,304 
bytes).")
+            .withDefaultValue(4194304);
+
+    public static final ConfigOption<String> SPILL_COMPRESSION_CODEC = new 
ConfigOption<>(String.class)
+            .withKey("auron.spill.compression.codec")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Compression codec used for Spark spill operations when 
data is written to disk due to memory pressure. "
+                            + "Common options include lz4, snappy, and gzip. 
The choice affects both spill performance and disk space usage.")
+            .withDefaultValue("lz4");
+
+    public static final ConfigOption<Boolean> SMJ_FALLBACK_ENABLE = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.smjfallback.enable")
+            .withCategory("Operator Supports")
+            .withDescription(
+                    "Enable fallback from hash join to sort-merge join when 
the hash table becomes too large to fit in memory. "
+                            + "This prevents out-of-memory errors by switching 
to a more memory-efficient join strategy when necessary.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Integer> SMJ_FALLBACK_ROWS_THRESHOLD = 
new ConfigOption<>(Integer.class)
+            .withKey("auron.smjfallback.rows.threshold")
+            .withCategory("Operator Supports")
+            .withDescription(
+                    "Row count threshold that triggers fallback from hash join 
to sort-merge join. When the number of rows "
+                            + "in the hash table exceeds this threshold, the 
system will switch to sort-merge join to avoid memory issues.")
+            .withDefaultValue(10000000);
+
+    public static final ConfigOption<Integer> SMJ_FALLBACK_MEM_SIZE_THRESHOLD 
= new ConfigOption<>(Integer.class)
+            .withKey("auron.smjfallback.mem.threshold")
+            .withCategory("Operator Supports")
+            .withDescription("Memory size threshold in bytes that triggers 
fallback from hash join to sort-merge join. "
+                    + "When the hash table memory usage exceeds this threshold 
(128MB by default), the system switches "
+                    + "to sort-merge join to prevent memory overflow.")
+            .withDefaultValue(134217728);
+
+    public static final ConfigOption<Double> ON_HEAP_SPILL_MEM_FRACTION = new 
ConfigOption<>(Double.class)
+            .withKey("auron.onHeapSpill.memoryFraction")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Maximum memory fraction allocated for on-heap spilling 
operations. This controls what portion "
+                            + "of the available on-heap memory can be used for 
spilling data to disk when memory pressure occurs.")
+            .withDefaultValue(0.9);
+
+    public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = new 
ConfigOption<>(Integer.class)
+            .withKey("auron.suggested.batch.memSize")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Suggested memory size in bytes for record batches. This 
setting controls the target memory allocation "
+                            + "for individual data batches to optimize memory 
usage and processing efficiency. Default is 8MB (8,388,608 bytes).")
+            .withDefaultValue(8388608);
+
+    public static final ConfigOption<Boolean> PARSE_JSON_ERROR_FALLBACK = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.parseJsonError.fallback")
+            .withCategory("Expression/Function Supports")
+            .withDescription(
+                    "Enable fallback to UDFJson implementation when native 
JSON parsing encounters errors. "
+                            + "This ensures query execution continues even 
when the native JSON parser fails, at the cost of potentially slower 
performance.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Integer> 
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE = new ConfigOption<>(Integer.class)
+            .withKey("auron.suggested.batch.memSize.multiwayMerging")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Suggested memory size in bytes for k-way merging 
operations. This uses a smaller batch memory size "
+                            + "compared to regular operations since multiple 
batches are kept in memory simultaneously during k-way merging. "
+                            + "Default is 1MB (1,048,576 bytes).")
+            .withDefaultValue(1048576);
+
+    public static final ConfigOption<Boolean> ORC_FORCE_POSITIONAL_EVOLUTION = 
new ConfigOption<>(Boolean.class)
+            .withKey("auron.orc.force.positional.evolution")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Force ORC positional evolution mode for schema evolution 
operations. When enabled, column mapping "
+                            + "will be based on column position rather than 
column name, which can be useful for certain schema evolution scenarios.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> ORC_TIMESTAMP_USE_MICROSECOND = 
new ConfigOption<>(Boolean.class)
+            .withKey("auron.orc.timestamp.use.microsecond")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Use microsecond precision when reading ORC timestamp 
columns instead of the default millisecond precision. "
+                            + "This provides higher temporal resolution for 
timestamp data but may require more storage space.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> ORC_SCHEMA_CASE_SENSITIVE = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.orc.schema.caseSensitive.enable")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Enable case-sensitive schema matching for ORC files. When 
true, column names in the schema must match "
+                            + "the case of columns in the ORC file exactly. 
When false, column name matching is case-insensitive.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> FORCE_SHORT_CIRCUIT_AND_OR = new 
ConfigOption<>(Boolean.class)
+            .withKey("auron.forceShortCircuitAndOr")
+            .withCategory("Expression/Function Supports")
+            .withDescription(
+                    "Force the use of short-circuit evaluation 
(PhysicalSCAndExprNode/PhysicalSCOrExprNode) for AND/OR expressions, "
+                            + "regardless of whether the right-hand side 
contains Hive UDFs. This can improve performance by avoiding unnecessary "
+                            + "evaluation of expressions when the result is 
already determined.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> ENABLE_SCAN = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.scan")
+            .withCategory("Operator Supports")
+            .withDescription("Enable ScanExec operation conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_PAIMON_SCAN = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.paimon.scan")
+            .withCategory("Operator Supports")
+            .withDescription("Enable PaimonScanExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_PROJECT = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.project")
+            .withCategory("Operator Supports")
+            .withDescription("Enable ProjectExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_FILTER = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.filter")
+            .withCategory("Operator Supports")
+            .withDescription("Enable FilterExec operation conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_SORT = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.sort")
+            .withCategory("Operator Supports")
+            .withDescription("Enable SortExec operation conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_UNION = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.union")
+            .withCategory("Operator Supports")
+            .withDescription("Enable UnionExec operation conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_SMJ = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.smj")
+            .withCategory("Operator Supports")
+            .withDescription("Enable SortMergeJoinExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_SHJ = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.shj")
+            .withCategory("Operator Supports")
+            .withDescription("Enable ShuffledHashJoinExec operation conversion 
to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_BHJ = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.bhj")
+            .withCategory("Operator Supports")
+            .withDescription("Enable BroadcastHashJoinExec operation 
conversion to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_BNLJ = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.bnlj")
+            .withCategory("Operator Supports")
+            .withDescription("Enable BroadcastNestedLoopJoinExec operation 
conversion to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_LOCAL_LIMIT = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.local.limit")
+            .withCategory("Operator Supports")
+            .withDescription("Enable LocalLimitExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_GLOBAL_LIMIT = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.global.limit")
+            .withCategory("Operator Supports")
+            .withDescription("Enable GlobalLimitExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_TAKE_ORDERED_AND_PROJECT 
= new SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.take.ordered.and.project")
+            .withCategory("Operator Supports")
+            .withDescription("Enable TakeOrderedAndProjectExec operation 
conversion to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_COLLECT_LIMIT = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.collectLimit")
+            .withCategory("Operator Supports")
+            .withDescription("Enable CollectLimitExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_AGGR = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.aggr")
+            .withCategory("Operator Supports")
+            .withDescription("Enable AggregateExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_EXPAND = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.expand")
+            .withCategory("Operator Supports")
+            .withDescription("Enable ExpandExec operation conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_WINDOW = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.window")
+            .withCategory("Operator Supports")
+            .withDescription("Enable WindowExec operation conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_WINDOW_GROUP_LIMIT = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.window.group.limit")
+            .withCategory("Operator Supports")
+            .withDescription("Enable WindowGroupLimitExec operation conversion 
to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_GENERATE = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.generate")
+            .withCategory("Operator Supports")
+            .withDescription("Enable GenerateExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_LOCAL_TABLE_SCAN = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.local.table.scan")
+            .withCategory("Operator Supports")
+            .withDescription("Enable LocalTableScanExec operation conversion 
to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_DATA_WRITING = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.data.writing")
+            .withCategory("Operator Supports")
+            .withDescription("Enable DataWritingExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> ENABLE_SCAN_PARQUET = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.scan.parquet")
+            .withCategory("Data Sources")
+            .withDescription("Enable ParquetScanExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_SCAN_PARQUET_TIMESTAMP = 
new SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.scan.parquet.timestamp")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Enable ParquetScanExec operation conversion with 
timestamp fields to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_SCAN_ORC = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.scan.orc")
+            .withCategory("Data Sources")
+            .withDescription("Enable OrcScanExec operation conversion to 
native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_SCAN_ORC_TIMESTAMP = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.scan.orc.timestamp")
+            .withCategory("Data Sources")
+            .withDescription(
+                    "Enable OrcScanExec operation conversion with timestamp 
fields to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_BROADCAST_EXCHANGE = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.broadcastExchange")
+            .withCategory("Operator Supports")
+            .withDescription("Enable BroadcastExchangeExec operation 
conversion to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> ENABLE_SHUFFLE_EXCHANGE = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.shuffleExchange")
+            .withCategory("Operator Supports")
+            .withDescription("Enable ShuffleExchangeExec operation conversion 
to native Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> UDF_JSON_ENABLED = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.udf.UDFJson.enabled")
+            .withCategory("Expression/Function Supports")
+            .withDescription("Enable UDFJson function conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> UDF_BRICKHOUSE_ENABLED = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.udf.brickhouse.enabled")
+            .withCategory("Expression/Function Supports")
+            .withDescription("Enable Brickhouse UDF conversion to native Auron 
implementations.")
+            .withDefaultValue(true);
+
+    public static final ConfigOption<Boolean> DECIMAL_ARITH_OP_ENABLED = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.decimal.arithOp.enabled")
+            .withCategory("Expression/Function Supports")
+            .withDescription("Enable decimal arithmetic operations conversion 
to native Auron implementations.")
+            .withDefaultValue(false);
+
+    public static final ConfigOption<Boolean> DATETIME_EXTRACT_ENABLED = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.datetime.extract.enabled")
+            .withCategory("Expression/Function Supports")
+            .withDescription("Enable datetime extract operations conversion to 
native Auron implementations.")
+            .withDefaultValue(false);
 
     @Override
     public <T> Optional<T> getOptional(ConfigOption<T> option) {
-        if (option.key().startsWith(SPARK_PREFIX)) {
-            return Optional.ofNullable(getSparkConf(option.key(), 
getOptionDefaultValue(option)));
-        } else {
-            return Optional.ofNullable(getSparkConf(SPARK_PREFIX + 
option.key(), getOptionDefaultValue(option)));
-        }
+        GetFromSparkType getFromSparkType = option instanceof SQLConfOption
+                ? GetFromSparkType.FROM_SQL_CONF
+                : option instanceof SparkContextOption
+                        ? GetFromSparkType.FROM_SPARK_CONTEXT
+                        : GetFromSparkType.FROM_SPARK_ENV;
+        return Optional.ofNullable(getFromSpark(
+                option.key(),
+                option.altKeys(),
+                option.getValueClass(),
+                () -> getOptionDefaultValue(option),
+                getFromSparkType));
     }
 
-    @Override
-    public <T> Optional<T> getOptional(String key) {
-        if (key.startsWith(SPARK_PREFIX)) {
-            return Optional.ofNullable(getSparkConf(key, null));
-        } else {
-            return Optional.ofNullable(getSparkConf(SPARK_PREFIX + key, null));
-        }
+    enum GetFromSparkType {
+        FROM_SPARK_ENV,
+        FROM_SPARK_CONTEXT,
+        FROM_SQL_CONF;
     }
 
-    private synchronized <T> T getSparkConf(String key, T defaultValue) {
-        // Use synchronized to avoid issues with multiple threads.
-        synchronized (ConfigEntry.class) {
-            ConfigEntry<T> entry = (ConfigEntry<T>) ConfigEntry.findEntry(key);
-            if (entry == null) {
-                entry = new ConfigEntryWithDefault<>(
-                        key,
-                        Option.<String>empty(),
+    @SuppressWarnings("unchecked")
+    private <T> T getFromSpark(
+            String key,
+            List<String> altKeys,
+            Class<T> valueClass,
+            Supplier<T> defaultValueSupplier,
+            GetFromSparkType getFromSparkType) {
+        Object configEntry;
+
+        synchronized (SparkAuronConfiguration.class) {
+            String sparkConfKey = key.startsWith(SPARK_PREFIX) ? key : 
SPARK_PREFIX + key;
+            configEntry = ConfigEntry.findEntry(sparkConfKey);
+            for (String altKey : altKeys) {
+                String sparkConfAltKey = altKey.startsWith(SPARK_PREFIX) ? 
altKey : SPARK_PREFIX + altKey;
+                if (configEntry != null) {
+                    break;
+                }
+                configEntry = ConfigEntry.findEntry(sparkConfAltKey);
+            }
+
+            if (configEntry == null) {
+                configEntry = new ConfigEntryWithDefaultFunction<>(
+                        sparkConfKey,
+                        Option.empty(),
                         "",
                         List$.MODULE$.empty(),
-                        defaultValue,
-                        (val) -> valueConverter(val, defaultValue, 
defaultValue == null),
+                        defaultValueSupplier::get,
+                        val -> valueConverter(val, valueClass),
                         String::valueOf,
                         null,
                         true,
                         null);
             }
-            return sparkConf.get(entry);
+        }
+
+        if (getFromSparkType == GetFromSparkType.FROM_SPARK_ENV) {
+            return SparkEnv.get().conf().get((ConfigEntry<T>) configEntry);
+
+        } else if (getFromSparkType == GetFromSparkType.FROM_SPARK_CONTEXT) {
+            return SparkContext.getOrCreate().getConf().get((ConfigEntry<T>) 
configEntry);
+
+        } else if (getFromSparkType == GetFromSparkType.FROM_SQL_CONF) {
+            return ((ConfigEntry<T>) 
configEntry).readFrom(SQLConf.get().reader());
+
+        } else {
+            throw new IllegalArgumentException("unknown getFromSparkType: " + 
getFromSparkType);
         }
     }
 
-    private <T> T valueConverter(String value, T defaultValue, boolean 
defaultValueIsNull) {
-        if (defaultValueIsNull) {
+    private <T> T valueConverter(String value, Class<T> valueClass) {
+        if (valueClass == Integer.class) {
+            return (T) Integer.valueOf(value);
+        } else if (valueClass == Long.class) {
+            return (T) Long.valueOf(value);
+        } else if (valueClass == Boolean.class) {
+            return (T) Boolean.valueOf(value);
+        } else if (valueClass == Float.class) {
+            return (T) Float.valueOf(value);
+        } else if (valueClass == Double.class) {
+            return (T) Double.valueOf(value);
+        } else if (valueClass == String.class) {
             return (T) value;
         } else {
-            if (defaultValue instanceof Integer) {
-                return (T) Integer.valueOf(value);
-            } else if (defaultValue instanceof Long) {
-                return (T) Long.valueOf(value);
-            } else if (defaultValue instanceof Boolean) {
-                return (T) Boolean.valueOf(value);
-            } else if (defaultValue instanceof Float) {
-                return (T) Float.valueOf(value);
-            } else if (defaultValue instanceof Double) {
-                return (T) Double.valueOf(value);
-            } else if (defaultValue instanceof String) {
-                return (T) String.valueOf(value);
-            } else {
-                throw new IllegalArgumentException("Unsupported default value 
type: "
-                        + defaultValue.getClass().getName());
-            }
+            throw new IllegalArgumentException("Unsupported default value 
type: " + valueClass.getName());
         }
     }
 }
+
+class SparkContextOption<T> extends ConfigOption<T> {
+    SparkContextOption(Class<T> clazz) {
+        super(clazz);
+    }
+}
+
+class SQLConfOption<T> extends ConfigOption<T> {
+    SQLConfOption(Class<T> clazz) {
+        super(clazz);
+    }
+}
diff --git 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java
 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java
new file mode 100644
index 00000000..a379bbfb
--- /dev/null
+++ 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.spark.configuration;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.auron.configuration.ConfigOption;
+
+public class SparkAuronConfigurationDocGenerator {
+
+    // Generate documentation for SparkAuronConfiguration
+    public static void main(String[] args) {
+        // Categories array based on SparkAuronConfiguration categories
+        String[] categories = {
+            "Runtime Configuration",
+            "Operator Supports",
+            "Data Sources",
+            "Expression/Function Supports",
+            "UDAF Fallback",
+            "Partial Aggregate Skipping"
+        };
+        Class<SparkAuronConfiguration> auronConfigurationClass = 
SparkAuronConfiguration.class;
+        List<ConfigOption<?>> configOptions = new ArrayList<>();
+
+        for (Field field : auronConfigurationClass.getFields()) {
+            try {
+                configOptions.add((ConfigOption<?>) field.get(null));
+            } catch (IllegalAccessException | ClassCastException e) {
+                // this is not a config option
+            }
+        }
+        configOptions.sort(Comparator.comparing(option -> option.category() + 
option.key()));
+
+        System.out.println("| Conf Key | Type | Default Value | Description 
|");
+        System.out.println("| -------- | ---- | ------------- | ----------- 
|");
+        for (String category : categories) {
+            System.out.println("| __" + category + "__ |");
+            for (ConfigOption<?> configOption : configOptions) {
+                if (!configOption.category().equals(category)) {
+                    continue;
+                }
+                String sparkConfKey =
+                        configOption.key().startsWith("spark.") ? 
configOption.key() : "spark." + configOption.key();
+                for (String altKey : configOption.altKeys()) {
+                    String sparkConfAltKey = altKey.startsWith("spark.") ? 
altKey : "spark." + altKey;
+                    sparkConfKey += "<br/>" + sparkConfAltKey;
+                }
+                String sparkConfDesc = configOption.description();
+                Class<?> sparkConfValueClass = configOption.getValueClass();
+                Object sparkConfDefaultValue = configOption.defaultValue() == 
null ? "-" : configOption.defaultValue();
+                System.out.println("| " + sparkConfKey + " | " + 
sparkConfValueClass.getSimpleName() + " | "
+                        + sparkConfDefaultValue + " | " + sparkConfDesc + " 
|");
+            }
+        }
+    }
+}
diff --git 
a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java 
b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
deleted file mode 100644
index 2943d4b2..00000000
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.auron;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkEnv$;
-
-/**
- * This class has been deprecated and migrated to {@link 
org.apache.auron.spark.configuration.SparkAuronConfiguration}.
- * Will be removed in the future.
- */
-@Deprecated
-@SuppressWarnings("unused")
-public enum AuronConf {
-    // support spark.auron.ui.enabled
-    UI_ENABLED("spark.auron.ui.enabled", true),
-
-    /// suggested batch size for arrow batches.
-    BATCH_SIZE("spark.auron.batchSize", 10000),
-
-    /// suggested fraction of off-heap memory used in native execution.
-    /// actual off-heap memory usage is expected to be 
spark.executor.memoryOverhead * fraction.
-    MEMORY_FRACTION("spark.auron.memoryFraction", 0.6),
-
-    /// suggested fraction of process total memory (on-heap and off-heap).
-    /// this limit is for process's resident memory usage
-    PROCESS_MEMORY_FRACTION("spark.auron.process.vmrss.memoryFraction", 0.9),
-
-    /// enable converting upper/lower functions to native, special cases may 
provide different
-    /// outputs from spark due to different unicode versions.
-    CASE_CONVERT_FUNCTIONS_ENABLE("spark.auron.enable.caseconvert.functions", 
true),
-
-    /// enable extra metrics of input batch statistics
-    INPUT_BATCH_STATISTICS_ENABLE("spark.auron.enableInputBatchStatistics", 
true),
-
-    /// supports UDAF and other aggregate functions not implemented
-    UDAF_FALLBACK_ENABLE("spark.auron.udafFallback.enable", true),
-
-    // TypedImperativeAggregate one row mem use size
-    SUGGESTED_UDAF_ROW_MEM_USAGE("spark.auron.suggested.udaf.memUsedSize", 64),
-
-    /// number of udafs to trigger sort-based aggregation
-    /// by default, all aggs containing udafs are converted to sort-based
-    
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG("spark.auron.udafFallback.num.udafs.trigger.sortAgg",
 1),
-
-    // TypedImperativeAggregate one row mem use size
-    
UDAF_FALLBACK_ESTIM_ROW_SIZE("spark.auron.udafFallback.typedImperativeEstimatedRowSize",
 256),
-
-    /// enable trimming string inputs before casting to numeric/boolean types
-    CAST_STRING_TRIM_ENABLE("spark.auron.cast.trimString", true),
-
-    /// ignore corrupted input files
-    IGNORE_CORRUPTED_FILES("spark.files.ignoreCorruptFiles", false),
-
-    /// enable partial aggregate skipping (see 
https://github.com/apache/auron/issues/327)
-    PARTIAL_AGG_SKIPPING_ENABLE("spark.auron.partialAggSkipping.enable", true),
-
-    /// partial aggregate skipping ratio
-    PARTIAL_AGG_SKIPPING_RATIO("spark.auron.partialAggSkipping.ratio", 0.9),
-
-    /// minimum number of rows to trigger partial aggregate skipping
-    PARTIAL_AGG_SKIPPING_MIN_ROWS("spark.auron.partialAggSkipping.minRows", 
BATCH_SIZE.intConf() * 5),
-
-    /// always skip partial aggregate when triggered spilling
-    
PARTIAL_AGG_SKIPPING_SKIP_SPILL("spark.auron.partialAggSkipping.skipSpill", 
false),
-
-    // parquet enable page filtering
-    PARQUET_ENABLE_PAGE_FILTERING("spark.auron.parquet.enable.pageFiltering", 
false),
-
-    // parquet enable bloom filter
-    PARQUET_ENABLE_BLOOM_FILTER("spark.auron.parquet.enable.bloomFilter", 
false),
-
-    // parquet max over read size
-    PARQUET_MAX_OVER_READ_SIZE("spark.auron.parquet.maxOverReadSize", 16384),
-
-    // parquet metadata cache size
-    PARQUET_METADATA_CACHE_SIZE("spark.auron.parquet.metadataCacheSize", 5),
-
-    // spark io compression codec
-    SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"),
-
-    // spark io compression zstd level
-    SPARK_IO_COMPRESSION_ZSTD_LEVEL("spark.io.compression.zstd.level", 1),
-
-    // tokio worker threads per cpu (spark.task.cpus), 0 for auto detection
-    TOKIO_WORKER_THREADS_PER_CPU("spark.auron.tokio.worker.threads.per.cpu", 
0),
-
-    // number of cpus per task
-    SPARK_TASK_CPUS("spark.task.cpus", 1),
-
-    // replace all sort-merge join to shuffled-hash join, only used for 
benchmarking
-    FORCE_SHUFFLED_HASH_JOIN("spark.auron.forceShuffledHashJoin", false),
-
-    // shuffle compression target buffer size, default is 4MB
-    
SHUFFLE_COMPRESSION_TARGET_BUF_SIZE("spark.auron.shuffle.compression.targetBufSize",
 4194304),
-
-    // spark spill compression codec
-    SPILL_COMPRESSION_CODEC("spark.auron.spill.compression.codec", "lz4"),
-
-    // enable hash join falling back to sort merge join when hash table is too 
big
-    SMJ_FALLBACK_ENABLE("spark.auron.smjfallback.enable", false),
-
-    // smj fallback threshold
-    SMJ_FALLBACK_ROWS_THRESHOLD("spark.auron.smjfallback.rows.threshold", 
10000000),
-
-    // smj fallback threshold
-    SMJ_FALLBACK_MEM_SIZE_THRESHOLD("spark.auron.smjfallback.mem.threshold", 
134217728),
-
-    // max memory fraction of on-heap spills
-    ON_HEAP_SPILL_MEM_FRACTION("spark.auron.onHeapSpill.memoryFraction", 0.9),
-
-    // suggested memory size for record batch
-    SUGGESTED_BATCH_MEM_SIZE("spark.auron.suggested.batch.memSize", 8388608),
-
-    // fallback to UDFJson when error parsing json in native implementation
-    PARSE_JSON_ERROR_FALLBACK("spark.auron.parseJsonError.fallback", true),
-
-    // suggested memory size for k-way merging
-    // use smaller batch memory size for kway merging since there will be 
multiple
-    // batches in memory at the same time
-    
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE("spark.auron.suggested.batch.memSize.multiwayMerging",
 1048576),
-
-    
ORC_FORCE_POSITIONAL_EVOLUTION("spark.auron.orc.force.positional.evolution", 
false),
-
-    // use microsecond precision when reading ORC timestamp columns
-    ORC_TIMESTAMP_USE_MICROSECOND("spark.auron.orc.timestamp.use.microsecond", 
false),
-
-    ORC_SCHEMA_CASE_SENSITIVE("spark.auron.orc.schema.caseSensitive.enable", 
false),
-
-    NATIVE_LOG_LEVEL("spark.auron.native.log.level", "info");
-
-    public final String key;
-    private final Object defaultValue;
-
-    AuronConf(String key, Object defaultValue) {
-        this.key = key;
-        this.defaultValue = defaultValue;
-    }
-
-    public boolean booleanConf() {
-        return conf().getBoolean(key, (boolean) defaultValue);
-    }
-
-    public int intConf() {
-        return conf().getInt(key, (int) defaultValue);
-    }
-
-    public long longConf() {
-        return conf().getLong(key, (long) defaultValue);
-    }
-
-    public double doubleConf() {
-        return conf().getDouble(key, (double) defaultValue);
-    }
-
-    public String stringConf() {
-        return conf().get(key, (String) defaultValue);
-    }
-
-    public static boolean booleanConf(String confName) {
-        return AuronConf.valueOf(confName).booleanConf();
-    }
-
-    public static int intConf(String confName) {
-        return AuronConf.valueOf(confName).intConf();
-    }
-
-    public static long longConf(String confName) {
-        return AuronConf.valueOf(confName).longConf();
-    }
-
-    public static double doubleConf(String confName) {
-        return AuronConf.valueOf(confName).doubleConf();
-    }
-
-    public static String stringConf(String confName) {
-        return AuronConf.valueOf(confName).stringConf();
-    }
-
-    private static SparkConf conf() {
-        return SparkEnv$.MODULE$.get().conf();
-    }
-}
diff --git 
a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java 
b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
deleted file mode 100644
index 3bea6e17..00000000
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.auron;
-
-import java.lang.management.BufferPoolMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.auron.functions.AuronUDFWrapperContext;
-import org.apache.auron.hadoop.fs.FSDataInputWrapper;
-import org.apache.auron.hadoop.fs.FSDataInputWrapper$;
-import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
-import org.apache.auron.hadoop.fs.FSDataOutputWrapper$;
-import org.apache.auron.memory.OnHeapSpillManager;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.TaskContext;
-import org.apache.spark.TaskContext$;
-import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager$;
-import org.apache.spark.sql.auron.util.TaskContextHelper$;
-
-/**
- * This class has been deprecated and migrated to {@link 
org.apache.auron.jni.JniBridge}.
- * Will be removed in the future.
- */
-@Deprecated
-@SuppressWarnings("unused")
-public class JniBridge {
-
-    @Deprecated
-    public static final ConcurrentHashMap<String, Object> resourcesMap = new 
ConcurrentHashMap<>();
-
-    @Deprecated
-    public static native long callNative(long initNativeMemory, String 
logLevel, AuronCallNativeWrapper wrapper);
-
-    @Deprecated
-    public static native boolean nextBatch(long ptr);
-
-    @Deprecated
-    public static native void finalizeNative(long ptr);
-
-    @Deprecated
-    public static native void onExit();
-
-    @Deprecated
-    public static ClassLoader getContextClassLoader() {
-        return Thread.currentThread().getContextClassLoader();
-    }
-
-    @Deprecated
-    public static void setContextClassLoader(ClassLoader cl) {
-        Thread.currentThread().setContextClassLoader(cl);
-    }
-
-    @Deprecated
-    public static Object getResource(String key) {
-        return resourcesMap.remove(key);
-    }
-
-    @Deprecated
-    public static TaskContext getTaskContext() {
-        return TaskContext$.MODULE$.get();
-    }
-
-    @Deprecated
-    public static OnHeapSpillManager getTaskOnHeapSpillManager() {
-        return SparkOnHeapSpillManager$.MODULE$.current();
-    }
-
-    @Deprecated
-    public static boolean isTaskRunning() {
-        TaskContext tc = getTaskContext();
-        if (tc == null) { // driver is always running
-            return true;
-        }
-        return !tc.isCompleted() && !tc.isInterrupted();
-    }
-
-    @Deprecated
-    public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, 
String path) throws Exception {
-        // the path is a URI string, so we need to convert it to a URI object, 
ref:
-        // org.apache.spark.paths.SparkPath.toPath
-        return FSDataInputWrapper$.MODULE$.wrap(fs.open(new Path(new 
URI(path))));
-    }
-
-    @Deprecated
-    public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem 
fs, String path) throws Exception {
-        return FSDataOutputWrapper$.MODULE$.wrap(fs.create(new Path(new 
URI(path))));
-    }
-
-    @Deprecated
-    private static final List<BufferPoolMXBean> directMXBeans =
-            ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
-
-    @Deprecated
-    public static long getTotalMemoryLimited() {
-        return NativeHelper$.MODULE$.totalMemory();
-    }
-
-    @Deprecated
-    public static long getDirectMemoryUsed() {
-        return directMXBeans.stream()
-                .mapToLong(BufferPoolMXBean::getTotalCapacity)
-                .sum();
-    }
-
-    @Deprecated
-    public static String getDirectWriteSpillToDiskFile() {
-        return SparkEnv.get()
-                .blockManager()
-                .diskBlockManager()
-                .createTempLocalBlock()
-                ._2
-                .getPath();
-    }
-
-    @Deprecated
-    public static void initNativeThread(ClassLoader cl, TaskContext tc) {
-        setContextClassLoader(cl);
-        TaskContext$.MODULE$.setTaskContext(tc);
-        TaskContextHelper$.MODULE$.setNativeThreadName();
-        TaskContextHelper$.MODULE$.setHDFSCallerContext();
-    }
-
-    @Deprecated
-    public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer 
udfSerialized) {
-        throw new UnsupportedOperationException("This API is designed to 
support next-generation multi-engine.");
-    }
-}
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
deleted file mode 100644
index b4028c1e..00000000
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.auron
-
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.StandardCopyOption
-import java.util.concurrent.atomic.AtomicReference
-
-import scala.annotation.nowarn
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.arrow.c.ArrowArray
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.c.CDataDictionaryProvider
-import org.apache.arrow.c.Data
-import org.apache.arrow.vector.VectorSchemaRoot
-import org.apache.arrow.vector.types.pojo.Schema
-import org.apache.spark.Partition
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.auron.util.Using
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils
-import 
org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils.ROOT_ALLOCATOR
-import org.apache.spark.sql.execution.auron.columnar.ColumnarHelper
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.CompletionIterator
-import org.apache.spark.util.ShutdownHookManager
-import org.apache.spark.util.Utils
-
-import org.apache.auron.metric.{MetricNode, SparkMetricNode}
-import org.apache.auron.protobuf.PartitionId
-import org.apache.auron.protobuf.PhysicalPlanNode
-import org.apache.auron.protobuf.TaskDefinition
-
-/**
- * This class has been deprecated and migrated to {@link
- * org.apache.auron.jni.AuronCallNativeWrapper}. Will be removed in the future.
- */
-@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated)
-@Deprecated
-case class AuronCallNativeWrapper(
-    nativePlan: PhysicalPlanNode,
-    partition: Partition,
-    context: Option[TaskContext],
-    metrics: SparkMetricNode)
-    extends Logging {
-
-  AuronCallNativeWrapper.initNative()
-
-  private val error: AtomicReference[Throwable] = new AtomicReference(null)
-  private val dictionaryProvider = new CDataDictionaryProvider()
-  private var arrowSchema: Schema = _
-  private var schema: StructType = _
-  private var toUnsafe: UnsafeProjection = _
-  private val batchRows: ArrayBuffer[InternalRow] = ArrayBuffer()
-  private var batchCurRowIdx = 0
-
-  logInfo(s"Start executing native plan ${nativePlan.getPhysicalPlanTypeCase}")
-  private var nativeRuntimePtr =
-    JniBridge.callNative(NativeHelper.nativeMemory, 
AuronConf.NATIVE_LOG_LEVEL.stringConf(), this)
-
-  private lazy val rowIterator = new Iterator[InternalRow] {
-    override def hasNext: Boolean = {
-      checkError()
-
-      if (batchCurRowIdx < batchRows.length) {
-        return true
-      }
-
-      // clear current batch
-      batchRows.clear()
-      batchCurRowIdx = 0
-
-      // load next batch
-      try {
-        if (nativeRuntimePtr != 0 && JniBridge.nextBatch(nativeRuntimePtr)) {
-          return hasNext
-        }
-      } finally {
-        // if error has been set, throw set error instead of this caught 
exception
-        checkError()
-      }
-      false
-    }
-
-    @Deprecated
-    override def next(): InternalRow = {
-      val batchRow = batchRows(batchCurRowIdx)
-      batchCurRowIdx += 1
-      batchRow
-    }
-  }
-
-  context.foreach(_.addTaskCompletionListener[Unit]((_: TaskContext) => 
close()))
-  context.foreach(_.addTaskFailureListener((_, _) => close()))
-
-  @Deprecated
-  def getRowIterator: Iterator[InternalRow] = {
-    CompletionIterator[InternalRow, Iterator[InternalRow]](rowIterator, 
close())
-  }
-
-  @Deprecated
-  protected def getMetrics: MetricNode =
-    metrics
-
-  @Deprecated
-  protected def importSchema(ffiSchemaPtr: Long): Unit = {
-    Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
-      arrowSchema = Data.importSchema(ROOT_ALLOCATOR, ffiSchema, 
dictionaryProvider)
-      schema = ArrowUtils.fromArrowSchema(arrowSchema)
-      toUnsafe = UnsafeProjection.create(schema)
-    }
-  }
-
-  @Deprecated
-  protected def importBatch(ffiArrayPtr: Long): Unit = {
-    if (nativeRuntimePtr == 0) {
-      throw new RuntimeException("Native runtime is finalized")
-    }
-
-    Using.resources(
-      ArrowArray.wrap(ffiArrayPtr),
-      VectorSchemaRoot.create(arrowSchema, ROOT_ALLOCATOR)) { case (ffiArray, 
root) =>
-      Data.importIntoVectorSchemaRoot(ROOT_ALLOCATOR, ffiArray, root, 
dictionaryProvider)
-
-      batchRows.append(
-        ColumnarHelper
-          .rootRowsIter(root)
-          .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
-          .toSeq: _*)
-    }
-  }
-
-  @Deprecated
-  protected def setError(error: Throwable): Unit = {
-    this.error.set(error)
-  }
-
-  @Deprecated
-  protected def checkError(): Unit = {
-    val throwable = error.getAndSet(null)
-    if (throwable != null) {
-      close()
-      throw throwable
-    }
-  }
-
-  @Deprecated
-  protected def getRawTaskDefinition: Array[Byte] = {
-    val partitionId: PartitionId = PartitionId
-      .newBuilder()
-      .setPartitionId(partition.index)
-      .setStageId(context.map(_.stageId()).getOrElse(0))
-      .setTaskId(context.map(_.taskAttemptId()).getOrElse(0))
-      .build()
-
-    val taskDefinition = TaskDefinition
-      .newBuilder()
-      .setTaskId(partitionId)
-      .setPlan(nativePlan)
-      .build()
-    taskDefinition.toByteArray
-  }
-
-  private def close(): Unit = {
-    synchronized {
-      batchRows.clear()
-      batchCurRowIdx = 0
-
-      if (nativeRuntimePtr != 0) {
-        JniBridge.finalizeNative(nativeRuntimePtr)
-        nativeRuntimePtr = 0
-        dictionaryProvider.close()
-        checkError()
-      }
-    }
-  }
-}
-
-@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated)
-object AuronCallNativeWrapper extends Logging {
-  def initNative(): Unit = {
-    lazyInitNative
-  }
-
-  private lazy val lazyInitNative: Unit = {
-    logInfo(
-      "Initializing native environment (" +
-        s"batchSize=${AuronConf.BATCH_SIZE.intConf()}, " +
-        s"nativeMemory=${NativeHelper.nativeMemory}, " +
-        s"memoryFraction=${AuronConf.MEMORY_FRACTION.doubleConf()})")
-
-    // arrow configuration
-    System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND")
-
-    assert(classOf[JniBridge] != null) // preload JNI bridge classes
-    AuronCallNativeWrapper.loadLibAuron()
-    ShutdownHookManager.addShutdownHook(() => JniBridge.onExit())
-  }
-
-  private def loadLibAuron(): Unit = {
-    val libName = System.mapLibraryName("auron")
-    try {
-      val classLoader = classOf[NativeSupports].getClassLoader
-      val tempFile = File.createTempFile("libauron-", ".tmp")
-      tempFile.deleteOnExit()
-
-      Utils.tryWithResource {
-        val is = classLoader.getResourceAsStream(libName)
-        assert(is != null, s"cannot load $libName")
-        is
-      }(Files.copy(_, tempFile.toPath, StandardCopyOption.REPLACE_EXISTING))
-      System.load(tempFile.getAbsolutePath)
-
-    } catch {
-      case e: IOException =>
-        throw new IllegalStateException("error loading native libraries: " + e)
-    }
-  }
-}
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 9b8bed91..def645d5 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -80,58 +80,34 @@ import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 object AuronConverters extends Logging {
-  def enableScan: Boolean =
-    getBooleanConf("spark.auron.enable.scan", defaultValue = true)
-  def enableProject: Boolean =
-    getBooleanConf("spark.auron.enable.project", defaultValue = true)
-  def enableFilter: Boolean =
-    getBooleanConf("spark.auron.enable.filter", defaultValue = true)
-  def enableSort: Boolean =
-    getBooleanConf("spark.auron.enable.sort", defaultValue = true)
-  def enableUnion: Boolean =
-    getBooleanConf("spark.auron.enable.union", defaultValue = true)
-  def enableSmj: Boolean =
-    getBooleanConf("spark.auron.enable.smj", defaultValue = true)
-  def enableShj: Boolean =
-    getBooleanConf("spark.auron.enable.shj", defaultValue = true)
-  def enableBhj: Boolean =
-    getBooleanConf("spark.auron.enable.bhj", defaultValue = true)
-  def enableBnlj: Boolean =
-    getBooleanConf("spark.auron.enable.bnlj", defaultValue = true)
-  def enableLocalLimit: Boolean =
-    getBooleanConf("spark.auron.enable.local.limit", defaultValue = true)
-  def enableGlobalLimit: Boolean =
-    getBooleanConf("spark.auron.enable.global.limit", defaultValue = true)
+  def enableScan: Boolean = SparkAuronConfiguration.ENABLE_SCAN.get()
+  def enableProject: Boolean = SparkAuronConfiguration.ENABLE_PROJECT.get()
+  def enableFilter: Boolean = SparkAuronConfiguration.ENABLE_FILTER.get()
+  def enableSort: Boolean = SparkAuronConfiguration.ENABLE_SORT.get()
+  def enableUnion: Boolean = SparkAuronConfiguration.ENABLE_UNION.get()
+  def enableSmj: Boolean = SparkAuronConfiguration.ENABLE_SMJ.get()
+  def enableShj: Boolean = SparkAuronConfiguration.ENABLE_SHJ.get()
+  def enableBhj: Boolean = SparkAuronConfiguration.ENABLE_BHJ.get()
+  def enableBnlj: Boolean = SparkAuronConfiguration.ENABLE_BNLJ.get()
+  def enableLocalLimit: Boolean = 
SparkAuronConfiguration.ENABLE_LOCAL_LIMIT.get()
+  def enableGlobalLimit: Boolean = 
SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT.get()
   def enableTakeOrderedAndProject: Boolean =
-    getBooleanConf("spark.auron.enable.take.ordered.and.project", defaultValue 
= true)
-  def enableCollectLimit: Boolean =
-    getBooleanConf("spark.auron.enable.collectLimit", defaultValue = true)
-  def enableAggr: Boolean =
-    getBooleanConf("spark.auron.enable.aggr", defaultValue = true)
-  def enableExpand: Boolean =
-    getBooleanConf("spark.auron.enable.expand", defaultValue = true)
-  def enableWindow: Boolean =
-    getBooleanConf("spark.auron.enable.window", defaultValue = true)
-  def enableWindowGroupLimit: Boolean =
-    getBooleanConf("spark.auron.enable.window.group.limit", defaultValue = 
true)
-  def enableGenerate: Boolean =
-    getBooleanConf("spark.auron.enable.generate", defaultValue = true)
-  def enableLocalTableScan: Boolean =
-    getBooleanConf("spark.auron.enable.local.table.scan", defaultValue = true)
-  def enableDataWriting: Boolean =
-    getBooleanConf("spark.auron.enable.data.writing", defaultValue = false)
-  def enableScanParquet: Boolean =
-    getBooleanConf("spark.auron.enable.scan.parquet", defaultValue = true)
+    SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT.get()
+  def enableCollectLimit: Boolean = 
SparkAuronConfiguration.ENABLE_COLLECT_LIMIT.get()
+  def enableAggr: Boolean = SparkAuronConfiguration.ENABLE_AGGR.get()
+  def enableExpand: Boolean = SparkAuronConfiguration.ENABLE_EXPAND.get()
+  def enableWindow: Boolean = SparkAuronConfiguration.ENABLE_WINDOW.get()
+  def enableWindowGroupLimit: Boolean = 
SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT.get()
+  def enableGenerate: Boolean = SparkAuronConfiguration.ENABLE_GENERATE.get()
+  def enableLocalTableScan: Boolean = 
SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN.get()
+  def enableDataWriting: Boolean = 
SparkAuronConfiguration.ENABLE_DATA_WRITING.get()
+  def enableScanParquet: Boolean = 
SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()
   def enableScanParquetTimestamp: Boolean =
-    getBooleanConf("spark.auron.enable.scan.parquet.timestamp", defaultValue = 
true)
-  def enableScanOrc: Boolean =
-    getBooleanConf("spark.auron.enable.scan.orc", defaultValue = true)
-  def enableScanOrcTimestamp: Boolean =
-    getBooleanConf("spark.auron.enable.scan.orc.timestamp", defaultValue = 
true)
-  def enableBroadcastExchange: Boolean =
-    getBooleanConf("spark.auron.enable.broadcastExchange", defaultValue = true)
-  def enableShuffleExechange: Boolean =
-    getBooleanConf("spark.auron.enable.shuffleExchange", defaultValue = true)
+    SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()
+  def enableScanOrc: Boolean = SparkAuronConfiguration.ENABLE_SCAN_ORC.get()
+  def enableScanOrcTimestamp: Boolean = 
SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()
+  def enableBroadcastExchange: Boolean = 
SparkAuronConfiguration.ENABLE_BROADCAST_EXCHANGE.get()
+  def enableShuffleExechange: Boolean = 
SparkAuronConfiguration.ENABLE_SHUFFLE_EXCHANGE.get()
 
   private val extConvertProviders = 
ServiceLoader.load(classOf[AuronConvertProvider]).asScala
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
index 1f8b6421..b68b0495 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
@@ -18,7 +18,6 @@ package org.apache.spark.sql.auron
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSessionExtensions
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -27,6 +26,8 @@ import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.internal.SQLConf
 
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
 class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with 
Logging {
   Shims.get.initExtension()
 
@@ -35,7 +36,6 @@ class AuronSparkSessionExtension extends 
(SparkSessionExtensions => Unit) with L
     SparkEnv.get.conf.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true")
     logInfo(s"${classOf[AuronSparkSessionExtension].getName} enabled")
 
-    assert(AuronSparkSessionExtension.auronEnabledKey != null)
     Shims.get.onApplyingExtension()
 
     extensions.injectColumnar(sparkSession => {
@@ -45,11 +45,6 @@ class AuronSparkSessionExtension extends 
(SparkSessionExtensions => Unit) with L
 }
 
 object AuronSparkSessionExtension extends Logging {
-  lazy val auronEnabledKey: ConfigEntry[Boolean] = SQLConf
-    .buildConf("spark.auron.enable")
-    .booleanConf
-    .createWithDefault(true)
-
   def dumpSimpleSparkPlanTreeNode(exec: SparkPlan, depth: Int = 0): Unit = {
     val nodeName = exec.nodeName
     val convertible = exec
@@ -68,7 +63,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) 
extends ColumnarRu
   override def preColumnarTransitions: Rule[SparkPlan] = {
     new Rule[SparkPlan] {
       override def apply(sparkPlan: SparkPlan): SparkPlan = {
-        if (!sparkPlan.conf.getConf(auronEnabledKey)) {
+        if (!SparkAuronConfiguration.AURON_ENABLED.get()) {
           return sparkPlan // performs no conversion if auron is not enabled
         }
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 8b48c39a..c4947c83 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -81,27 +81,15 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
 import org.apache.auron.{protobuf => pb}
-import org.apache.auron.configuration.AuronConfiguration
-import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.protobuf.PhysicalExprNode
 import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 object NativeConverters extends Logging {
-
-  private def sparkAuronConfig: AuronConfiguration =
-    AuronAdaptor.getInstance.getAuronConfiguration
-  def udfEnabled: Boolean =
-    AuronConverters.getBooleanConf("spark.auron.udf.enabled", defaultValue = 
true)
-  def udfJsonEnabled: Boolean =
-    AuronConverters.getBooleanConf("spark.auron.udf.UDFJson.enabled", 
defaultValue = true)
-  def udfBrickHouseEnabled: Boolean =
-    AuronConverters.getBooleanConf("spark.auron.udf.brickhouse.enabled", 
defaultValue = true)
-  def decimalArithOpEnabled: Boolean =
-    AuronConverters.getBooleanConf("spark.auron.decimal.arithOp.enabled", 
defaultValue = false)
-  def datetimeExtractEnabled: Boolean =
-    AuronConverters.getBooleanConf("spark.auron.datetime.extract.enabled", 
defaultValue = false)
-  def castTrimStringEnabled: Boolean =
-    AuronConverters.getBooleanConf("spark.auron.cast.trimString", defaultValue 
= true)
+  def udfJsonEnabled: Boolean = SparkAuronConfiguration.UDF_JSON_ENABLED.get()
+  def udfBrickHouseEnabled: Boolean = 
SparkAuronConfiguration.UDF_BRICKHOUSE_ENABLED.get()
+  def decimalArithOpEnabled: Boolean = 
SparkAuronConfiguration.DECIMAL_ARITH_OP_ENABLED.get()
+  def datetimeExtractEnabled: Boolean = 
SparkAuronConfiguration.DATETIME_EXTRACT_ENABLED.get()
+  def castTrimStringEnabled: Boolean = 
SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE.get()
   def singleChildFallbackEnabled: Boolean =
     AuronConverters.getBooleanConf(
       "spark.auron.expression.singleChildFallback.enabled",
@@ -781,7 +769,7 @@ object NativeConverters extends Logging {
       // if rhs is complex in and/or operators, use short-circuiting 
implementation
       // or if forceShortCircuitAndOr is enabled, always use short-circuiting
       case And(lhs, rhs)
-          if 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR)
+          if SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR.get()
             || rhs.find(HiveUDFUtil.isHiveUDF).isDefined =>
         buildExprNode {
           _.setScAndExpr(
@@ -791,7 +779,7 @@ object NativeConverters extends Logging {
               .setRight(convertExprWithFallback(rhs, isPruningExpr, fallback)))
         }
       case Or(lhs, rhs)
-          if 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR)
+          if SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR.get()
             || rhs.find(HiveUDFUtil.isHiveUDF).isDefined =>
         buildExprNode {
           _.setScOrExpr(
@@ -889,11 +877,9 @@ object NativeConverters extends Logging {
       case Length(arg) if arg.dataType == StringType =>
         buildScalarFunction(pb.ScalarFunction.CharacterLength, arg :: Nil, 
IntegerType)
 
-      case e: Lower
-          if 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
 =>
+      case e: Lower if 
SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE.get() =>
         buildExtScalarFunction("Spark_StringLower", e.children, e.dataType)
-      case e: Upper
-          if 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
 =>
+      case e: Upper if 
SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE.get() =>
         buildExtScalarFunction("Spark_StringUpper", e.children, e.dataType)
 
       case e: StringTrim =>
@@ -1254,7 +1240,7 @@ object NativeConverters extends Logging {
         }
 
         // fallback to UDAF
-        if 
(sparkAuronConfig.getBoolean(SparkAuronConfiguration.UDAF_FALLBACK_ENABLE)) {
+        if (SparkAuronConfiguration.UDAF_FALLBACK_ENABLE.get()) {
           udaf match {
             case _: DeclarativeAggregate =>
             case _: TypedImperativeAggregate[_] =>
diff --git 
a/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
 
b/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
deleted file mode 100644
index 99345250..00000000
--- 
a/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.spark.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.apache.spark.SparkConf;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * This class is used to test the {@link SparkAuronConfiguration) class.
- */
-public class SparkAuronConfigurationTest {
-
-    private SparkAuronConfiguration sparkAuronConfiguration;
-
-    @BeforeEach
-    public void setUp() {
-        SparkConf sparkConf = new SparkConf();
-        sparkConf.set("spark.auron.ui.enabled", "false");
-        sparkConf.set("spark.auron.process.vmrss.memoryFraction", "0.66");
-        sparkConf.set("spark.auron.suggested.udaf.memUsedSize", "1024");
-        sparkConf.set("spark.io.compression.codec", "gzip");
-        sparkAuronConfiguration = new SparkAuronConfiguration(sparkConf);
-    }
-
-    @Test
-    public void testGetSparkConfig() {
-        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.UI_ENABLED), 
false);
-        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION),
 0.66);
-        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE),
 1024);
-        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC),
 "gzip");
-
-        assertEquals(
-                sparkAuronConfiguration
-                        .getOptional(SparkAuronConfiguration.UI_ENABLED.key())
-                        .get(),
-                false);
-        assertEquals(
-                sparkAuronConfiguration
-                        
.getOptional(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION.key())
-                        .get(),
-                0.66);
-        assertEquals(
-                sparkAuronConfiguration
-                        
.getOptional(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE.key())
-                        .get(),
-                1024);
-        assertEquals(
-                sparkAuronConfiguration
-                        
.getOptional(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC.key())
-                        .get(),
-                "gzip");
-
-        // Test default value
-        assertEquals(
-                sparkAuronConfiguration
-                        
.getOptional(SparkAuronConfiguration.PARSE_JSON_ERROR_FALLBACK)
-                        .get(),
-                true);
-    }
-}
diff --git 
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
 
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
index 5b320d18..f9d75c43 100644
--- 
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
+++ 
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.hive.execution.auron.plan.NativePaimonTableScanExec
 
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
 class PaimonConvertProvider extends AuronConvertProvider with Logging {
 
-  override def isEnabled: Boolean = {
-    AuronConverters.getBooleanConf("spark.auron.enable.paimon.scan", 
defaultValue = false)
-  }
+  override def isEnabled: Boolean = 
SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
 
   override def isSupported(exec: SparkPlan): Boolean = {
     exec match {


Reply via email to