This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 7da2ba5d [AURON #2104] Migrate the Native dependency configuration 
from SparkAuronConfiguration to AuronConfiguration (#2105)
7da2ba5d is described below

commit 7da2ba5d10113ede0706db4cc228e1d563e7faf3
Author: zhangmang <[email protected]>
AuthorDate: Thu Mar 19 17:02:29 2026 +0800

    [AURON #2104] Migrate the Native dependency configuration from 
SparkAuronConfiguration to AuronConfiguration (#2105)
    
    # Which issue does this PR close?
    
    Closes #2104
    
    # Rationale for this change
    Decoupling the Native Engine and Spark, migrate the configuration items
    currently in the Spark module to the auron-core module
    
    # What changes are included in this PR?
    Migrate these configurations from SparkAuronConfiguration to
    AuronConfiguration.
    * TOKIO_WORKER_THREADS_PER_CPU
    * SPARK_TASK_CPUS
    * SUGGESTED_BATCH_MEM_SIZE
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * No need test
---
 .../auron/configuration/AuronConfiguration.java    | 26 ++++++++++++++++++++++
 native-engine/auron-jni-bridge/src/conf.rs         |  2 +-
 native-engine/auron/src/rt.rs                      |  6 ++---
 .../configuration/SparkAuronConfiguration.java     | 25 ---------------------
 .../spark/sql/auron/SparkUDTFWrapperContext.scala  |  3 +--
 .../execution/auron/arrowio/ArrowFFIExporter.scala |  3 +--
 6 files changed, 32 insertions(+), 33 deletions(-)

diff --git 
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
 
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
index 31017d8b..a2bd08d4 100644
--- 
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
+++ 
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
@@ -39,6 +39,32 @@ public abstract class AuronConfiguration {
             .withDescription("Log level for native execution.")
             .withDefaultValue("info");
 
+    public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU = 
new ConfigOption<>(Integer.class)
+            .withKey("auron.tokio.worker.threads.per.cpu")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Number of Tokio worker threads to create per CPU core 
(spark.task.cpus). Set to 0 for automatic detection "
+                            + "based on available CPU cores. This setting 
controls the thread pool size for Tokio-based asynchronous operations.")
+            .withDefaultValue(0);
+
+    public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = new 
ConfigOption<>(Integer.class)
+            .withKey("auron.suggested.batch.memSize")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Suggested memory size in bytes for record batches. This 
setting controls the target memory allocation "
+                            + "for individual data batches to optimize memory 
usage and processing efficiency. Default is 8MB (8,388,608 bytes).")
+            .withDefaultValue(8388608);
+
+    public static final ConfigOption<Integer> TASK_CPUS = new 
ConfigOption<>(Integer.class)
+            .withKey("task.cpus")
+            .withCategory("Runtime Configuration")
+            .withDescription(
+                    "Number of CPU cores allocated per Spark task. This 
setting determines the parallelism level "
+                            + "for individual tasks and affects resource 
allocation and task scheduling. "
+                            + "In Spark, the value is retrieved from SparkEnv 
via the 'spark.task.cpus' option; "
+                            + "if not configured, the default value of 1 is 
used.")
+            .withDefaultValue(1);
+
     public abstract <T> Optional<T> getOptional(ConfigOption<T> option);
 
     public <T> T get(ConfigOption<T> option) {
diff --git a/native-engine/auron-jni-bridge/src/conf.rs 
b/native-engine/auron-jni-bridge/src/conf.rs
index 351432f9..fa6c1d57 100644
--- a/native-engine/auron-jni-bridge/src/conf.rs
+++ b/native-engine/auron-jni-bridge/src/conf.rs
@@ -47,7 +47,7 @@ define_conf!(IntConf, PARQUET_METADATA_CACHE_SIZE);
 define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
 define_conf!(IntConf, SPARK_IO_COMPRESSION_ZSTD_LEVEL);
 define_conf!(IntConf, TOKIO_WORKER_THREADS_PER_CPU);
-define_conf!(IntConf, SPARK_TASK_CPUS);
+define_conf!(IntConf, TASK_CPUS);
 define_conf!(IntConf, SHUFFLE_COMPRESSION_TARGET_BUF_SIZE);
 define_conf!(StringConf, SPILL_COMPRESSION_CODEC);
 define_conf!(BooleanConf, SMJ_FALLBACK_ENABLE);
diff --git a/native-engine/auron/src/rt.rs b/native-engine/auron/src/rt.rs
index 29a0f180..b3d4addd 100644
--- a/native-engine/auron/src/rt.rs
+++ b/native-engine/auron/src/rt.rs
@@ -29,7 +29,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use auron_jni_bridge::{
-    conf::{IntConf, SPARK_TASK_CPUS, TOKIO_WORKER_THREADS_PER_CPU},
+    conf::{IntConf, TASK_CPUS, TOKIO_WORKER_THREADS_PER_CPU},
     is_task_running, jni_call, jni_call_static, jni_convert_byte_array, 
jni_exception_check,
     jni_exception_occurred, jni_new_global_ref, jni_new_object, jni_new_string,
 };
@@ -105,8 +105,8 @@ impl NativeExecutionRuntime {
 
         let num_worker_threads = {
             let worker_threads_per_cpu = 
TOKIO_WORKER_THREADS_PER_CPU.value().unwrap_or(0);
-            let spark_task_cpus = SPARK_TASK_CPUS.value().unwrap_or(0);
-            worker_threads_per_cpu * spark_task_cpus
+            let task_cpus = TASK_CPUS.value().unwrap_or(0);
+            worker_threads_per_cpu * task_cpus
         };
 
         // create tokio runtime
diff --git 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index 3cc1e534..a1bf8e3c 100644
--- 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++ 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -202,23 +202,6 @@ public class SparkAuronConfiguration extends 
AuronConfiguration {
                     + "but require more CPU time and memory.")
             .withDynamicDefaultValue(_conf -> 
SparkEnv.get().conf().getInt("spark.io.compression.zstd.level", 1));
 
-    public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU = 
new ConfigOption<>(Integer.class)
-            .withKey("auron.tokio.worker.threads.per.cpu")
-            .withCategory("Runtime Configuration")
-            .withDescription(
-                    "Number of Tokio worker threads to create per CPU core 
(spark.task.cpus). Set to 0 for automatic detection "
-                            + "based on available CPU cores. This setting 
controls the thread pool size for Tokio-based asynchronous operations.")
-            .withDefaultValue(0);
-
-    public static final ConfigOption<Integer> SPARK_TASK_CPUS = new 
ConfigOption<>(Integer.class)
-            .withKey("task.cpus")
-            .withCategory("Runtime Configuration")
-            .withDescription(
-                    "Number of CPU cores allocated per Spark task. This 
setting determines the parallelism level "
-                            + "for individual tasks and affects resource 
allocation and task scheduling. "
-                            + "Defaults to spark.task.cpus.")
-            .withDynamicDefaultValue(_conf -> 
SparkEnv.get().conf().getInt("spark.task.cpus", 1));
-
     public static final ConfigOption<Boolean> FORCE_SHUFFLED_HASH_JOIN = new 
ConfigOption<>(Boolean.class)
             .withKey("auron.forceShuffledHashJoin")
             .withCategory("Operator Supports")
@@ -276,14 +259,6 @@ public class SparkAuronConfiguration extends 
AuronConfiguration {
                             + "of the available on-heap memory can be used for 
spilling data to disk when memory pressure occurs.")
             .withDefaultValue(0.9);
 
-    public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = new 
ConfigOption<>(Integer.class)
-            .withKey("auron.suggested.batch.memSize")
-            .withCategory("Runtime Configuration")
-            .withDescription(
-                    "Suggested memory size in bytes for record batches. This 
setting controls the target memory allocation "
-                            + "for individual data batches to optimize memory 
usage and processing efficiency. Default is 8MB (8,388,608 bytes).")
-            .withDefaultValue(8388608);
-
     public static final ConfigOption<Boolean> PARSE_JSON_ERROR_FALLBACK = new 
ConfigOption<>(Boolean.class)
             .withKey("auron.parseJsonError.fallback")
             .withCategory("Expression/Function Supports")
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
index e830d838..52199321 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.auron.configuration.AuronConfiguration
 import org.apache.auron.jni.AuronAdaptor
-import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 case class SparkUDTFWrapperContext(serialized: ByteBuffer) extends Logging {
   private val (expr, javaParamsSchema) =
@@ -66,7 +65,7 @@ case class SparkUDTFWrapperContext(serialized: ByteBuffer) 
extends Logging {
     AuronAdaptor.getInstance.getAuronConfiguration
   private val batchSize = 
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
   private val maxBatchMemorySize =
-    
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
+    sparkAuronConfig.getInteger(AuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
 
   private val dictionaryProvider: DictionaryProvider = new 
MapDictionaryProvider()
   private val javaOutputSchema = StructType(
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
index a23acebf..0192aa35 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.auron.arrowio.AuronArrowFFIExporter
 import org.apache.auron.configuration.AuronConfiguration
 import org.apache.auron.jni.AuronAdaptor
-import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 class ArrowFFIExporter(rowIter: Iterator[InternalRow], schema: StructType)
     extends AuronArrowFFIExporter
@@ -49,7 +48,7 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow], 
schema: StructType)
     AuronAdaptor.getInstance.getAuronConfiguration
   private val maxBatchNumRows = 
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
   private val maxBatchMemorySize =
-    
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
+    sparkAuronConfig.getInteger(AuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
 
   private val arrowSchema = ArrowUtils.toArrowSchema(schema)
   private val emptyDictionaryProvider = new MapDictionaryProvider()

Reply via email to