This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 923db4d9 [AURON #1531] Enable SparkAuronConfiguration and deprecated 
AuronConf (#1532)
923db4d9 is described below

commit 923db4d9d758aa4961f2138c5d8d5b9c45b07ca5
Author: zhangmang <[email protected]>
AuthorDate: Fri Oct 31 11:36:04 2025 +0800

    [AURON #1531] Enable SparkAuronConfiguration and deprecated AuronConf 
(#1532)
    
    * [AURON #1531] Enable SparkAuronConfiguration and deprecated AuronConf
    
    * fix ci fail
    
    * fix checkstyle
    
    * add @Deprecated for AuronConf
    
    * fix comment
---
 .../configuration/SparkAuronConfiguration.java     | 33 ++++++++++++----------
 .../java/org/apache/spark/sql/auron/AuronConf.java |  5 ++++
 .../apache/spark/sql/auron/AuronConverters.scala   | 11 ++++++--
 .../apache/spark/sql/auron/NativeConverters.scala  | 23 ++++++++++-----
 .../org/apache/spark/sql/auron/NativeHelper.scala  |  5 +++-
 .../spark/sql/auron/SparkUDAFWrapperContext.scala  |  6 +++-
 .../spark/sql/auron/SparkUDTFWrapperContext.scala  | 11 ++++++--
 .../sql/auron/memory/SparkOnHeapSpillManager.scala |  6 ++--
 .../execution/auron/arrowio/ArrowFFIExporter.scala | 12 ++++++--
 .../sql/execution/auron/plan/NativeAggBase.scala   |  6 ++--
 10 files changed, 83 insertions(+), 35 deletions(-)

diff --git 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index c4f35b29..0a533342 100644
--- 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++ 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -262,22 +262,25 @@ public class SparkAuronConfiguration extends 
AuronConfiguration {
         }
     }
 
-    private <T> T getSparkConf(String key, T defaultValue) {
-        ConfigEntry<T> entry = (ConfigEntry<T>) ConfigEntry.findEntry(key);
-        if (entry == null) {
-            entry = new ConfigEntryWithDefault<>(
-                    key,
-                    Option.<String>empty(),
-                    "",
-                    List$.MODULE$.empty(),
-                    defaultValue,
-                    (val) -> valueConverter(val, defaultValue, defaultValue == 
null),
-                    String::valueOf,
-                    null,
-                    true,
-                    null);
+    private synchronized <T> T getSparkConf(String key, T defaultValue) {
+        // Use synchronized to avoid issues with multiple threads.
+        synchronized (ConfigEntry.class) {
+            ConfigEntry<T> entry = (ConfigEntry<T>) ConfigEntry.findEntry(key);
+            if (entry == null) {
+                entry = new ConfigEntryWithDefault<>(
+                        key,
+                        Option.<String>empty(),
+                        "",
+                        List$.MODULE$.empty(),
+                        defaultValue,
+                        (val) -> valueConverter(val, defaultValue, 
defaultValue == null),
+                        String::valueOf,
+                        null,
+                        true,
+                        null);
+            }
+            return sparkConf.get(entry);
         }
-        return sparkConf.get(entry);
     }
 
     private <T> T valueConverter(String value, T defaultValue, boolean 
defaultValueIsNull) {
diff --git 
a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java 
b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
index c16fce31..aec3a6c6 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
@@ -19,6 +19,11 @@ package org.apache.spark.sql.auron;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkEnv$;
 
+/**
+ * This class has been deprecated and migrated to {@link 
org.apache.auron.spark.configuration.SparkAuronConfiguration}.
+ * Will be removed in the future.
+ */
+@Deprecated
 @SuppressWarnings("unused")
 public enum AuronConf {
     // support spark.auron.ui.enabled
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index dcb4b672..c911c465 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -86,9 +86,12 @@ import 
org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.LongType
 
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.EmptyPartitionsExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 import org.apache.auron.sparkver
 
 object AuronConverters extends Logging {
@@ -140,6 +143,10 @@ object AuronConverters extends Logging {
     getBooleanConf("spark.auron.enable.shuffleExchange", defaultValue = true)
 
   private val extConvertProviders = 
ServiceLoader.load(classOf[AuronConvertProvider]).asScala
+
+  private val sparkAuronConfig: AuronConfiguration =
+    AuronAdaptor.getInstance.getAuronConfiguration
+
   def extConvertSupported(exec: SparkPlan): Boolean = {
     extConvertProviders.exists(_.isSupported(exec))
   }
@@ -507,7 +514,7 @@ object AuronConverters extends Logging {
 
     // force shuffled-hash join
     if (!requireOrdering
-      && AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()
+      && 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN)
       && exec.children.forall(_.isInstanceOf[NativeSortBase])) {
       val (leftKeys, rightKeys, joinType, condition, left, right, isSkewJoin) =
         (
@@ -605,7 +612,7 @@ object AuronConverters extends Logging {
         getIsSkewJoinFromSHJ(exec))
 
     } catch {
-      case _ if AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf() =>
+      case _ if 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN) =>
         logWarning(
           "in forceShuffledHashJoin mode, hash joins are likely too run OOM 
because of " +
             "small on-heap memory configuration. to avoid this, we will fall 
back this " +
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 3fd82a98..4594e4ad 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -80,9 +80,17 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.protobuf.PhysicalExprNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 object NativeConverters extends Logging {
+
+  private val sparkAuronConfig: AuronConfiguration =
+    AuronAdaptor.getInstance.getAuronConfiguration
+  def udfEnabled: Boolean =
+    AuronConverters.getBooleanConf("spark.auron.udf.enabled", defaultValue = 
true)
   def udfJsonEnabled: Boolean =
     AuronConverters.getBooleanConf("spark.auron.udf.UDFJson.enabled", 
defaultValue = true)
   def udfBrickHouseEnabled: Boolean =
@@ -452,7 +460,7 @@ object NativeConverters extends Logging {
           if (cast.child.dataType == StringType &&
             (cast.dataType.isInstanceOf[NumericType] || cast.dataType
               .isInstanceOf[BooleanType]) &&
-            AuronConf.CAST_STRING_TRIM_ENABLE.booleanConf()) {
+            
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE)) {
             // converting Cast(str as num) to StringTrim(Cast(str as num)) if 
enabled
             StringTrim(cast.child)
           } else {
@@ -831,9 +839,11 @@ object NativeConverters extends Logging {
       case Length(arg) if arg.dataType == StringType =>
         buildScalarFunction(pb.ScalarFunction.CharacterLength, arg :: Nil, 
IntegerType)
 
-      case e: Lower if AuronConf.CASE_CONVERT_FUNCTIONS_ENABLE.booleanConf() =>
+      case e: Lower
+          if 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
 =>
         buildExtScalarFunction("StringLower", e.children, e.dataType)
-      case e: Upper if AuronConf.CASE_CONVERT_FUNCTIONS_ENABLE.booleanConf() =>
+      case e: Upper
+          if 
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
 =>
         buildExtScalarFunction("StringUpper", e.children, e.dataType)
 
       case e: StringTrim =>
@@ -1162,7 +1172,7 @@ object NativeConverters extends Logging {
         }
 
         // fallback to UDAF
-        if (AuronConf.UDAF_FALLBACK_ENABLE.booleanConf()) {
+        if 
(sparkAuronConfig.getBoolean(SparkAuronConfiguration.UDAF_FALLBACK_ENABLE)) {
           udaf match {
             case _: DeclarativeAggregate =>
             case _: TypedImperativeAggregate[_] =>
@@ -1200,9 +1210,8 @@ object NativeConverters extends Logging {
               .setInputSchema(NativeConverters.convertSchema(paramsSchema)))
           aggBuilder.addAllChildren(convertedChildren.keys.asJava)
         } else {
-          throw new NotImplementedError(
-            s"unsupported aggregate expression: (${e.getClass})," +
-              s" set ${AuronConf.UDAF_FALLBACK_ENABLE.key} true to enable UDAF 
fallbacking")
+          throw new NotImplementedError(s"Unsupported aggregate expression: 
(${e.getClass})," +
+            s" set ${SparkAuronConfiguration.UDAF_FALLBACK_ENABLE.key} to true 
to enable UDAF fallbacking.")
         }
 
     }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
index 7e6bfdfc..8c6288cf 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
@@ -39,8 +39,10 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.CompletionIterator
 
+import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.PhysicalPlanNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 object NativeHelper extends Logging {
   val currentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
@@ -188,7 +190,8 @@ object NativeHelper extends Logging {
       "shuffle_write_total_time" -> 
nanoTimingMetric("Native.shuffle_write_total_time"),
       "shuffle_read_total_time" -> 
nanoTimingMetric("Native.shuffle_read_total_time"))
 
-    if (AuronConf.INPUT_BATCH_STATISTICS_ENABLE.booleanConf()) {
+    if (AuronAdaptor.getInstance.getAuronConfiguration.getBoolean(
+        SparkAuronConfiguration.INPUT_BATCH_STATISTICS_ENABLE)) {
       metrics ++= TreeMap(
         "input_batch_count" -> metric("Native.input_batches"),
         "input_row_count" -> metric("Native.input_rows"),
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
index 83b4bc8d..7adb250b 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
@@ -55,6 +55,9 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.ByteBufferInputStream
 import org.apache.spark.util.Utils
 
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
 case class SparkUDAFWrapperContext[B](serialized: ByteBuffer) extends Logging {
   private val (expr, javaParamsSchema) =
     NativeConverters.deserializeExpression[AggregateFunction, StructType]({
@@ -455,7 +458,8 @@ case class TypedImperativeAggRowsColumn[B](
             evaluator.estimatedRowSize = Some(estimRowSize)
             estimRowSize
           } else {
-            AuronConf.UDAF_FALLBACK_ESTIM_ROW_SIZE.intConf()
+            AuronAdaptor.getInstance.getAuronConfiguration.getInteger(
+              SparkAuronConfiguration.UDAF_FALLBACK_ESTIM_ROW_SIZE)
           }
         rows.length * estimRowSize
     }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
index a7865106..a832cefb 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
@@ -39,6 +39,10 @@ import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.types.StructType
 
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
 case class SparkUDTFWrapperContext(serialized: ByteBuffer) extends Logging {
   private val (expr, javaParamsSchema) =
     NativeConverters.deserializeExpression[Generator, StructType]({
@@ -58,8 +62,11 @@ case class SparkUDTFWrapperContext(serialized: ByteBuffer) 
extends Logging {
   }
 
   private val tc = TaskContext.get()
-  private val batchSize = AuronConf.BATCH_SIZE.intConf()
-  private val maxBatchMemorySize = AuronConf.SUGGESTED_BATCH_MEM_SIZE.intConf()
+  private val sparkAuronConfig: AuronConfiguration =
+    AuronAdaptor.getInstance.getAuronConfiguration
+  private val batchSize = 
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
+  private val maxBatchMemorySize =
+    
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
 
   private val dictionaryProvider: DictionaryProvider = new 
MapDictionaryProvider()
   private val javaOutputSchema = StructType(
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
index 01f03712..e77aaaf3 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
@@ -28,11 +28,12 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.memory.MemoryConsumer
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.memory.auron.OnHeapSpillManagerHelper
-import org.apache.spark.sql.auron.AuronConf
 import org.apache.spark.storage.BlockManager
 import org.apache.spark.util.Utils
 
+import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.memory.OnHeapSpillManager
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 class SparkOnHeapSpillManager(taskContext: TaskContext)
     extends MemoryConsumer(
@@ -90,7 +91,8 @@ class SparkOnHeapSpillManager(taskContext: TaskContext)
         s" ratio=$jvmMemoryUsedRatio")
 
     // we should have at least 10% free memory
-    val maxRatio = AuronConf.ON_HEAP_SPILL_MEM_FRACTION.doubleConf()
+    val maxRatio = AuronAdaptor.getInstance.getAuronConfiguration.getDouble(
+      SparkAuronConfiguration.ON_HEAP_SPILL_MEM_FRACTION)
     memoryUsedRatio < maxRatio && jvmMemoryUsedRatio < maxRatio
   }
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
index b0f2ee32..591ae266 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
@@ -27,7 +27,7 @@ import org.apache.arrow.c.Data
 import org.apache.arrow.vector.VectorSchemaRoot
 import 
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.{AuronConf, NativeHelper}
+import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.util.Using
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils
@@ -37,11 +37,17 @@ import 
org.apache.spark.sql.execution.auron.arrowio.util.ArrowWriter
 import org.apache.spark.sql.types.StructType
 
 import org.apache.auron.arrowio.AuronArrowFFIExporter
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 class ArrowFFIExporter(rowIter: Iterator[InternalRow], schema: StructType)
     extends AuronArrowFFIExporter {
-  private val maxBatchNumRows = AuronConf.BATCH_SIZE.intConf()
-  private val maxBatchMemorySize = AuronConf.SUGGESTED_BATCH_MEM_SIZE.intConf()
+  private val sparkAuronConfig: AuronConfiguration =
+    AuronAdaptor.getInstance.getAuronConfiguration
+  private val maxBatchNumRows = 
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
+  private val maxBatchMemorySize =
+    
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
 
   private val arrowSchema = ArrowUtils.toArrowSchema(schema)
   private val emptyDictionaryProvider = new MapDictionaryProvider()
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
index e470a0f4..7f75e4d6 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
@@ -22,7 +22,6 @@ import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.auron.AuronConf
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -57,7 +56,9 @@ import org.apache.spark.sql.types.BinaryType
 import org.apache.spark.sql.types.DataType
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 abstract class NativeAggBase(
     execMode: AggExecMode,
@@ -154,7 +155,8 @@ abstract class NativeAggBase(
     child.outputPartitioning
 
   private def supportsPartialSkipping = (
-    AuronConf.PARTIAL_AGG_SKIPPING_ENABLE.booleanConf()
+    AuronAdaptor.getInstance.getAuronConfiguration.getBoolean(
+      SparkAuronConfiguration.PARTIAL_AGG_SKIPPING_ENABLE)
       && initialInputBufferOffset == 0
       && aggregateExpressions.forall(_.mode == Partial)
       && requiredChildDistribution.forall(_ == UnspecifiedDistribution)

Reply via email to