This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d139619 [AURON #1456] Introduce SparkAuronConfiguration (#1457)
0d139619 is described below

commit 0d139619000ba6db5df6dacbf792a2156204ffc2
Author: zhangmang <[email protected]>
AuthorDate: Fri Oct 17 10:50:27 2025 +0800

    [AURON #1456] Introduce SparkAuronConfiguration (#1457)
    
    * [AURON #1456] Introduce SparkAuronConfiguration
    
    * fix
---
 spark-extension/pom.xml                            |   7 +
 .../configuration/SparkAuronConfiguration.java     | 305 +++++++++++++++++++++
 .../configuration/SparkAuronConfigurationTest.java |  77 ++++++
 3 files changed, 389 insertions(+)

diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml
index 59b8fc34..d634e941 100644
--- a/spark-extension/pom.xml
+++ b/spark-extension/pom.xml
@@ -88,6 +88,13 @@
       <groupId>org.apache.arrow</groupId>
       <artifactId>arrow-vector</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <version>${junit.jupiter.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
new file mode 100644
index 00000000..c4f35b29
--- /dev/null
+++ 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.spark.configuration;
+
+import static org.apache.auron.util.Preconditions.checkNotNull;
+
+import java.util.Optional;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.configuration.ConfigOption;
+import org.apache.auron.configuration.ConfigOptions;
+import org.apache.auron.jni.AuronAdaptor;
+import org.apache.spark.SparkConf;
+import org.apache.spark.internal.config.ConfigEntry;
+import org.apache.spark.internal.config.ConfigEntryWithDefault;
+import scala.Option;
+import scala.collection.immutable.List$;
+
+/**
+ * Spark configuration proxy for Auron.
+ * All configuration prefixes start with spark.
+ */
+public class SparkAuronConfiguration extends AuronConfiguration {
+
+    public static final String SPARK_PREFIX = "spark.";
+
+    public static final ConfigOption<Boolean> UI_ENABLED = 
ConfigOptions.key("auron.ui.enabled")
+            .description("support spark.auron.ui.enabled.")
+            .booleanType()
+            .defaultValue(true);
+
+    public static final ConfigOption<Double> PROCESS_MEMORY_FRACTION = 
ConfigOptions.key(
+                    "auron.process.vmrss.memoryFraction")
+            .description("suggested fraction of process total memory (on-heap 
and off-heap). "
+                    + "this limit is for process's resident memory usage.")
+            .doubleType()
+            .defaultValue(0.9);
+
+    public static final ConfigOption<Boolean> CASE_CONVERT_FUNCTIONS_ENABLE = 
ConfigOptions.key(
+                    "auron.enable.caseconvert.functions")
+            .description("enable converting upper/lower functions to native, 
special cases may provide different, "
+                    + "outputs from spark due to different unicode versions. ")
+            .booleanType()
+            .defaultValue(true);
+
+    public static final ConfigOption<Boolean> INPUT_BATCH_STATISTICS_ENABLE = 
ConfigOptions.key(
+                    "auron.enableInputBatchStatistics")
+            .description("enable extra metrics of input batch statistics. ")
+            .booleanType()
+            .defaultValue(true);
+
+    public static final ConfigOption<Boolean> UDAF_FALLBACK_ENABLE = 
ConfigOptions.key("auron.udafFallback.enable")
+            .description("supports UDAF and other aggregate functions not 
implemented. ")
+            .booleanType()
+            .defaultValue(true);
+
+    public static final ConfigOption<Integer> SUGGESTED_UDAF_ROW_MEM_USAGE = 
ConfigOptions.key(
+                    "auron.suggested.udaf.memUsedSize")
+            .description("TypedImperativeAggregate one row mem use size. ")
+            .intType()
+            .defaultValue(64);
+
+    public static final ConfigOption<Integer> 
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG = ConfigOptions.key(
+                    "auron.udafFallback.num.udafs.trigger.sortAgg")
+            .description(
+                    "number of udafs to trigger sort-based aggregation, by 
default, all aggs containing udafs are converted to sort-based.")
+            .intType()
+            .defaultValue(1);
+
+    public static final ConfigOption<Integer> UDAF_FALLBACK_ESTIM_ROW_SIZE = 
ConfigOptions.key(
+                    "auron.udafFallback.typedImperativeEstimatedRowSize")
+            .description("TypedImperativeAggregate one row mem use size.")
+            .intType()
+            .defaultValue(256);
+
+    public static final ConfigOption<Boolean> CAST_STRING_TRIM_ENABLE = 
ConfigOptions.key("auron.cast.trimString")
+            .description("enable trimming string inputs before casting to 
numeric/boolean types. ")
+            .booleanType()
+            .defaultValue(true);
+
+    public static final ConfigOption<Boolean> IGNORE_CORRUPTED_FILES = 
ConfigOptions.key("files.ignoreCorruptFiles")
+            .description("ignore corrupted input files. ")
+            .booleanType()
+            .defaultValue(false);
+
+    public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_ENABLE = 
ConfigOptions.key(
+                    "auron.partialAggSkipping.enable")
+            .description("enable partial aggregate skipping (see 
https://github.com/apache/auron/issues/327). ")
+            .booleanType()
+            .defaultValue(true);
+
+    public static final ConfigOption<Double> PARTIAL_AGG_SKIPPING_RATIO = 
ConfigOptions.key(
+                    "auron.partialAggSkipping.ratio")
+            .description("partial aggregate skipping ratio. ")
+            .doubleType()
+            .defaultValue(0.9);
+
+    public static final ConfigOption<Integer> PARTIAL_AGG_SKIPPING_MIN_ROWS = 
ConfigOptions.key(
+                    "auron.partialAggSkipping.minRows")
+            .description("minimum number of rows to trigger partial aggregate 
skipping.")
+            .intType()
+            .defaultValue(
+                    AuronAdaptor.getInstance() != null
+                            ? AuronAdaptor.getInstance()
+                                            .getAuronConfiguration()
+                                            
.getOptional(AuronConfiguration.BATCH_SIZE)
+                                            .get()
+                                    * 5
+                            : AuronConfiguration.BATCH_SIZE.defaultValue() * 
5);
+
+    public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_SKIP_SPILL 
= ConfigOptions.key(
+                    "auron.partialAggSkipping.skipSpill")
+            .description("always skip partial aggregate when triggered 
spilling. ")
+            .booleanType()
+            .defaultValue(false);
+
+    public static final ConfigOption<Boolean> PARQUET_ENABLE_PAGE_FILTERING = 
ConfigOptions.key(
+                    "auron.parquet.enable.pageFiltering")
+            .description("parquet enable page filtering. ")
+            .booleanType()
+            .defaultValue(false);
+
+    public static final ConfigOption<Boolean> PARQUET_ENABLE_BLOOM_FILTER = 
ConfigOptions.key(
+                    "auron.parquet.enable.bloomFilter")
+            .description("parquet enable bloom filter. ")
+            .booleanType()
+            .defaultValue(false);
+
+    public static final ConfigOption<Integer> PARQUET_MAX_OVER_READ_SIZE = 
ConfigOptions.key(
+                    "auron.parquet.maxOverReadSize")
+            .description("parquet max over read size.")
+            .intType()
+            .defaultValue(16384);
+
+    public static final ConfigOption<Integer> PARQUET_METADATA_CACHE_SIZE = 
ConfigOptions.key(
+                    "auron.parquet.metadataCacheSize")
+            .description("parquet metadata cache size.")
+            .intType()
+            .defaultValue(5);
+
+    public static final ConfigOption<String> SPARK_IO_COMPRESSION_CODEC = 
ConfigOptions.key("io.compression.codec")
+            .description("spark io compression codec.")
+            .stringType()
+            .defaultValue("lz4");
+
+    public static final ConfigOption<Integer> SPARK_IO_COMPRESSION_ZSTD_LEVEL 
= ConfigOptions.key(
+                    "io.compression.zstd.level")
+            .description("spark io compression zstd level.")
+            .intType()
+            .defaultValue(1);
+
+    public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU = 
ConfigOptions.key(
+                    "auron.tokio.worker.threads.per.cpu")
+            .description("tokio worker threads per cpu (spark.task.cpus), 0 
for auto detection.")
+            .intType()
+            .defaultValue(0);
+
+    public static final ConfigOption<Integer> SPARK_TASK_CPUS = 
ConfigOptions.key("task.cpus")
+            .description("number of cpus per task.")
+            .intType()
+            .defaultValue(1);
+
+    public static final ConfigOption<Boolean> FORCE_SHUFFLED_HASH_JOIN = 
ConfigOptions.key(
+                    "auron.forceShuffledHashJoin")
+            .description("replace all sort-merge join to shuffled-hash join, 
only used for benchmarking. ")
+            .booleanType()
+            .defaultValue(false);
+
+    public static final ConfigOption<Integer> 
SHUFFLE_COMPRESSION_TARGET_BUF_SIZE = ConfigOptions.key(
+                    "auron.shuffle.compression.targetBufSize")
+            .description("shuffle compression target buffer size, default is 
4MB.")
+            .intType()
+            .defaultValue(4194304);
+
+    public static final ConfigOption<String> SPILL_COMPRESSION_CODEC = 
ConfigOptions.key(
+                    "auron.spill.compression.codec")
+            .description("spark spill compression codec.")
+            .stringType()
+            .defaultValue("lz4");
+
+    public static final ConfigOption<Boolean> SMJ_FALLBACK_ENABLE = 
ConfigOptions.key("auron.smjfallback.enable")
+            .description("enable hash join falling back to sort merge join 
when hash table is too big. ")
+            .booleanType()
+            .defaultValue(false);
+
+    public static final ConfigOption<Integer> SMJ_FALLBACK_ROWS_THRESHOLD = 
ConfigOptions.key(
+                    "auron.smjfallback.rows.threshold")
+            .description("smj fallback threshold.")
+            .intType()
+            .defaultValue(10000000);
+
+    public static final ConfigOption<Integer> SMJ_FALLBACK_MEM_SIZE_THRESHOLD 
= ConfigOptions.key(
+                    "auron.smjfallback.mem.threshold")
+            .description("smj fallback mem threshold.")
+            .intType()
+            .defaultValue(134217728);
+
+    public static final ConfigOption<Double> ON_HEAP_SPILL_MEM_FRACTION = 
ConfigOptions.key(
+                    "auron.onHeapSpill.memoryFraction")
+            .description("max memory fraction of on-heap spills. ")
+            .doubleType()
+            .defaultValue(0.9);
+
+    public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = 
ConfigOptions.key(
+                    "auron.suggested.batch.memSize")
+            .description("suggested memory size for record batch.")
+            .intType()
+            .defaultValue(8388608);
+
+    public static final ConfigOption<Boolean> PARSE_JSON_ERROR_FALLBACK = 
ConfigOptions.key(
+                    "auron.parseJsonError.fallback")
+            .description("fallback to UDFJson when error parsing json in 
native implementation. ")
+            .booleanType()
+            .defaultValue(true);
+
+    public static final ConfigOption<Integer> 
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE = ConfigOptions.key(
+                    "auron.suggested.batch.memSize.multiwayMerging")
+            .description("suggested memory size for k-way merging use smaller 
batch memory size for "
+                    + "k-way merging since there will be multiple batches in 
memory at the same time.")
+            .intType()
+            .defaultValue(1048576);
+    public static final ConfigOption<Boolean> ORC_FORCE_POSITIONAL_EVOLUTION = 
ConfigOptions.key(
+                    "auron.orc.force.positional.evolution")
+            .description("orc force positional evolution. ")
+            .booleanType()
+            .defaultValue(false);
+
+    private final SparkConf sparkConf;
+
+    public SparkAuronConfiguration(SparkConf conf) {
+        this.sparkConf = checkNotNull(conf, "spark conf cannot be null");
+    }
+
+    @Override
+    public <T> Optional<T> getOptional(ConfigOption<T> option) {
+        if (option.key().startsWith(SPARK_PREFIX)) {
+            return Optional.ofNullable(getSparkConf(option.key(), 
option.defaultValue()));
+        } else {
+            return Optional.ofNullable(getSparkConf(SPARK_PREFIX + 
option.key(), option.defaultValue()));
+        }
+    }
+
+    @Override
+    public <T> Optional<T> getOptional(String key) {
+        if (key.startsWith(SPARK_PREFIX)) {
+            return Optional.ofNullable(getSparkConf(key, null));
+        } else {
+            return Optional.ofNullable(getSparkConf(SPARK_PREFIX + key, null));
+        }
+    }
+
+    private <T> T getSparkConf(String key, T defaultValue) {
+        ConfigEntry<T> entry = (ConfigEntry<T>) ConfigEntry.findEntry(key);
+        if (entry == null) {
+            entry = new ConfigEntryWithDefault<>(
+                    key,
+                    Option.<String>empty(),
+                    "",
+                    List$.MODULE$.empty(),
+                    defaultValue,
+                    (val) -> valueConverter(val, defaultValue, defaultValue == 
null),
+                    String::valueOf,
+                    null,
+                    true,
+                    null);
+        }
+        return sparkConf.get(entry);
+    }
+
+    private <T> T valueConverter(String value, T defaultValue, boolean 
defaultValueIsNull) {
+        if (defaultValueIsNull) {
+            return (T) value;
+        } else {
+            if (defaultValue instanceof Integer) {
+                return (T) Integer.valueOf(value);
+            } else if (defaultValue instanceof Long) {
+                return (T) Long.valueOf(value);
+            } else if (defaultValue instanceof Boolean) {
+                return (T) Boolean.valueOf(value);
+            } else if (defaultValue instanceof Float) {
+                return (T) Float.valueOf(value);
+            } else if (defaultValue instanceof Double) {
+                return (T) Double.valueOf(value);
+            } else if (defaultValue instanceof String) {
+                return (T) String.valueOf(value);
+            } else {
+                throw new IllegalArgumentException("Unsupported default value 
type: "
+                        + defaultValue.getClass().getName());
+            }
+        }
+    }
+}
diff --git 
a/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
 
b/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
new file mode 100644
index 00000000..99345250
--- /dev/null
+++ 
b/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.spark.configuration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.spark.SparkConf;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * This class is used to test the {@link SparkAuronConfiguration) class.
+ */
+public class SparkAuronConfigurationTest {
+
+    private SparkAuronConfiguration sparkAuronConfiguration;
+
+    @BeforeEach
+    public void setUp() {
+        SparkConf sparkConf = new SparkConf();
+        sparkConf.set("spark.auron.ui.enabled", "false");
+        sparkConf.set("spark.auron.process.vmrss.memoryFraction", "0.66");
+        sparkConf.set("spark.auron.suggested.udaf.memUsedSize", "1024");
+        sparkConf.set("spark.io.compression.codec", "gzip");
+        sparkAuronConfiguration = new SparkAuronConfiguration(sparkConf);
+    }
+
+    @Test
+    public void testGetSparkConfig() {
+        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.UI_ENABLED), 
false);
+        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION),
 0.66);
+        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE),
 1024);
+        
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC),
 "gzip");
+
+        assertEquals(
+                sparkAuronConfiguration
+                        .getOptional(SparkAuronConfiguration.UI_ENABLED.key())
+                        .get(),
+                false);
+        assertEquals(
+                sparkAuronConfiguration
+                        
.getOptional(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION.key())
+                        .get(),
+                0.66);
+        assertEquals(
+                sparkAuronConfiguration
+                        
.getOptional(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE.key())
+                        .get(),
+                1024);
+        assertEquals(
+                sparkAuronConfiguration
+                        
.getOptional(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC.key())
+                        .get(),
+                "gzip");
+
+        // Test default value
+        assertEquals(
+                sparkAuronConfiguration
+                        
.getOptional(SparkAuronConfiguration.PARSE_JSON_ERROR_FALLBACK)
+                        .get(),
+                true);
+    }
+}

Reply via email to