This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 923db4d9 [AURON #1531] Enable SparkAuronConfiguration and deprecated
AuronConf (#1532)
923db4d9 is described below
commit 923db4d9d758aa4961f2138c5d8d5b9c45b07ca5
Author: zhangmang <[email protected]>
AuthorDate: Fri Oct 31 11:36:04 2025 +0800
[AURON #1531] Enable SparkAuronConfiguration and deprecated AuronConf
(#1532)
* [AURON #1531] Enable SparkAuronConfiguration and deprecated AuronConf
* fix ci fail
* fix checkstyle
* add @Deprecated for AuronConf
* fix comment
---
.../configuration/SparkAuronConfiguration.java | 33 ++++++++++++----------
.../java/org/apache/spark/sql/auron/AuronConf.java | 5 ++++
.../apache/spark/sql/auron/AuronConverters.scala | 11 ++++++--
.../apache/spark/sql/auron/NativeConverters.scala | 23 ++++++++++-----
.../org/apache/spark/sql/auron/NativeHelper.scala | 5 +++-
.../spark/sql/auron/SparkUDAFWrapperContext.scala | 6 +++-
.../spark/sql/auron/SparkUDTFWrapperContext.scala | 11 ++++++--
.../sql/auron/memory/SparkOnHeapSpillManager.scala | 6 ++--
.../execution/auron/arrowio/ArrowFFIExporter.scala | 12 ++++++--
.../sql/execution/auron/plan/NativeAggBase.scala | 6 ++--
10 files changed, 83 insertions(+), 35 deletions(-)
diff --git
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index c4f35b29..0a533342 100644
---
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -262,22 +262,25 @@ public class SparkAuronConfiguration extends
AuronConfiguration {
}
}
- private <T> T getSparkConf(String key, T defaultValue) {
- ConfigEntry<T> entry = (ConfigEntry<T>) ConfigEntry.findEntry(key);
- if (entry == null) {
- entry = new ConfigEntryWithDefault<>(
- key,
- Option.<String>empty(),
- "",
- List$.MODULE$.empty(),
- defaultValue,
- (val) -> valueConverter(val, defaultValue, defaultValue ==
null),
- String::valueOf,
- null,
- true,
- null);
+ private synchronized <T> T getSparkConf(String key, T defaultValue) {
+ // Use synchronized to avoid issues with multiple threads.
+ synchronized (ConfigEntry.class) {
+ ConfigEntry<T> entry = (ConfigEntry<T>) ConfigEntry.findEntry(key);
+ if (entry == null) {
+ entry = new ConfigEntryWithDefault<>(
+ key,
+ Option.<String>empty(),
+ "",
+ List$.MODULE$.empty(),
+ defaultValue,
+ (val) -> valueConverter(val, defaultValue,
defaultValue == null),
+ String::valueOf,
+ null,
+ true,
+ null);
+ }
+ return sparkConf.get(entry);
}
- return sparkConf.get(entry);
}
private <T> T valueConverter(String value, T defaultValue, boolean
defaultValueIsNull) {
diff --git
a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
index c16fce31..aec3a6c6 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
@@ -19,6 +19,11 @@ package org.apache.spark.sql.auron;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
+/**
+ * This class has been deprecated and migrated to {@link
org.apache.auron.spark.configuration.SparkAuronConfiguration}.
+ * Will be removed in the future.
+ */
+@Deprecated
@SuppressWarnings("unused")
public enum AuronConf {
// support spark.auron.ui.enabled
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index dcb4b672..c911c465 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -86,9 +86,12 @@ import
org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.LongType
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.EmptyPartitionsExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
import org.apache.auron.sparkver
object AuronConverters extends Logging {
@@ -140,6 +143,10 @@ object AuronConverters extends Logging {
getBooleanConf("spark.auron.enable.shuffleExchange", defaultValue = true)
private val extConvertProviders =
ServiceLoader.load(classOf[AuronConvertProvider]).asScala
+
+ private val sparkAuronConfig: AuronConfiguration =
+ AuronAdaptor.getInstance.getAuronConfiguration
+
def extConvertSupported(exec: SparkPlan): Boolean = {
extConvertProviders.exists(_.isSupported(exec))
}
@@ -507,7 +514,7 @@ object AuronConverters extends Logging {
// force shuffled-hash join
if (!requireOrdering
- && AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()
+ &&
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN)
&& exec.children.forall(_.isInstanceOf[NativeSortBase])) {
val (leftKeys, rightKeys, joinType, condition, left, right, isSkewJoin) =
(
@@ -605,7 +612,7 @@ object AuronConverters extends Logging {
getIsSkewJoinFromSHJ(exec))
} catch {
- case _ if AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf() =>
+ case _ if
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN) =>
logWarning(
"in forceShuffledHashJoin mode, hash joins are likely too run OOM
because of " +
"small on-heap memory configuration. to avoid this, we will fall
back this " +
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 3fd82a98..4594e4ad 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -80,9 +80,17 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.protobuf.PhysicalExprNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
object NativeConverters extends Logging {
+
+ private val sparkAuronConfig: AuronConfiguration =
+ AuronAdaptor.getInstance.getAuronConfiguration
+ def udfEnabled: Boolean =
+ AuronConverters.getBooleanConf("spark.auron.udf.enabled", defaultValue =
true)
def udfJsonEnabled: Boolean =
AuronConverters.getBooleanConf("spark.auron.udf.UDFJson.enabled",
defaultValue = true)
def udfBrickHouseEnabled: Boolean =
@@ -452,7 +460,7 @@ object NativeConverters extends Logging {
if (cast.child.dataType == StringType &&
(cast.dataType.isInstanceOf[NumericType] || cast.dataType
.isInstanceOf[BooleanType]) &&
- AuronConf.CAST_STRING_TRIM_ENABLE.booleanConf()) {
+
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE)) {
// converting Cast(str as num) to StringTrim(Cast(str as num)) if
enabled
StringTrim(cast.child)
} else {
@@ -831,9 +839,11 @@ object NativeConverters extends Logging {
case Length(arg) if arg.dataType == StringType =>
buildScalarFunction(pb.ScalarFunction.CharacterLength, arg :: Nil,
IntegerType)
- case e: Lower if AuronConf.CASE_CONVERT_FUNCTIONS_ENABLE.booleanConf() =>
+ case e: Lower
+ if
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
=>
buildExtScalarFunction("StringLower", e.children, e.dataType)
- case e: Upper if AuronConf.CASE_CONVERT_FUNCTIONS_ENABLE.booleanConf() =>
+ case e: Upper
+ if
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
=>
buildExtScalarFunction("StringUpper", e.children, e.dataType)
case e: StringTrim =>
@@ -1162,7 +1172,7 @@ object NativeConverters extends Logging {
}
// fallback to UDAF
- if (AuronConf.UDAF_FALLBACK_ENABLE.booleanConf()) {
+ if
(sparkAuronConfig.getBoolean(SparkAuronConfiguration.UDAF_FALLBACK_ENABLE)) {
udaf match {
case _: DeclarativeAggregate =>
case _: TypedImperativeAggregate[_] =>
@@ -1200,9 +1210,8 @@ object NativeConverters extends Logging {
.setInputSchema(NativeConverters.convertSchema(paramsSchema)))
aggBuilder.addAllChildren(convertedChildren.keys.asJava)
} else {
- throw new NotImplementedError(
- s"unsupported aggregate expression: (${e.getClass})," +
- s" set ${AuronConf.UDAF_FALLBACK_ENABLE.key} true to enable UDAF
fallbacking")
+ throw new NotImplementedError(s"Unsupported aggregate expression:
(${e.getClass})," +
+ s" set ${SparkAuronConfiguration.UDAF_FALLBACK_ENABLE.key} to true
to enable UDAF fallbacking.")
}
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
index 7e6bfdfc..8c6288cf 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
@@ -39,8 +39,10 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.CompletionIterator
+import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.PhysicalPlanNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
object NativeHelper extends Logging {
val currentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
@@ -188,7 +190,8 @@ object NativeHelper extends Logging {
"shuffle_write_total_time" ->
nanoTimingMetric("Native.shuffle_write_total_time"),
"shuffle_read_total_time" ->
nanoTimingMetric("Native.shuffle_read_total_time"))
- if (AuronConf.INPUT_BATCH_STATISTICS_ENABLE.booleanConf()) {
+ if (AuronAdaptor.getInstance.getAuronConfiguration.getBoolean(
+ SparkAuronConfiguration.INPUT_BATCH_STATISTICS_ENABLE)) {
metrics ++= TreeMap(
"input_batch_count" -> metric("Native.input_batches"),
"input_row_count" -> metric("Native.input_rows"),
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
index 83b4bc8d..7adb250b 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
@@ -55,6 +55,9 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.Utils
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
case class SparkUDAFWrapperContext[B](serialized: ByteBuffer) extends Logging {
private val (expr, javaParamsSchema) =
NativeConverters.deserializeExpression[AggregateFunction, StructType]({
@@ -455,7 +458,8 @@ case class TypedImperativeAggRowsColumn[B](
evaluator.estimatedRowSize = Some(estimRowSize)
estimRowSize
} else {
- AuronConf.UDAF_FALLBACK_ESTIM_ROW_SIZE.intConf()
+ AuronAdaptor.getInstance.getAuronConfiguration.getInteger(
+ SparkAuronConfiguration.UDAF_FALLBACK_ESTIM_ROW_SIZE)
}
rows.length * estimRowSize
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
index a7865106..a832cefb 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
@@ -39,6 +39,10 @@ import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
case class SparkUDTFWrapperContext(serialized: ByteBuffer) extends Logging {
private val (expr, javaParamsSchema) =
NativeConverters.deserializeExpression[Generator, StructType]({
@@ -58,8 +62,11 @@ case class SparkUDTFWrapperContext(serialized: ByteBuffer)
extends Logging {
}
private val tc = TaskContext.get()
- private val batchSize = AuronConf.BATCH_SIZE.intConf()
- private val maxBatchMemorySize = AuronConf.SUGGESTED_BATCH_MEM_SIZE.intConf()
+ private val sparkAuronConfig: AuronConfiguration =
+ AuronAdaptor.getInstance.getAuronConfiguration
+ private val batchSize =
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
+ private val maxBatchMemorySize =
+
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
private val dictionaryProvider: DictionaryProvider = new
MapDictionaryProvider()
private val javaOutputSchema = StructType(
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
index 01f03712..e77aaaf3 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
@@ -28,11 +28,12 @@ import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryConsumer
import org.apache.spark.memory.MemoryMode
import org.apache.spark.memory.auron.OnHeapSpillManagerHelper
-import org.apache.spark.sql.auron.AuronConf
import org.apache.spark.storage.BlockManager
import org.apache.spark.util.Utils
+import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.memory.OnHeapSpillManager
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
class SparkOnHeapSpillManager(taskContext: TaskContext)
extends MemoryConsumer(
@@ -90,7 +91,8 @@ class SparkOnHeapSpillManager(taskContext: TaskContext)
s" ratio=$jvmMemoryUsedRatio")
// we should have at least 10% free memory
- val maxRatio = AuronConf.ON_HEAP_SPILL_MEM_FRACTION.doubleConf()
+ val maxRatio = AuronAdaptor.getInstance.getAuronConfiguration.getDouble(
+ SparkAuronConfiguration.ON_HEAP_SPILL_MEM_FRACTION)
memoryUsedRatio < maxRatio && jvmMemoryUsedRatio < maxRatio
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
index b0f2ee32..591ae266 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
@@ -27,7 +27,7 @@ import org.apache.arrow.c.Data
import org.apache.arrow.vector.VectorSchemaRoot
import
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider
import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.{AuronConf, NativeHelper}
+import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.util.Using
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils
@@ -37,11 +37,17 @@ import
org.apache.spark.sql.execution.auron.arrowio.util.ArrowWriter
import org.apache.spark.sql.types.StructType
import org.apache.auron.arrowio.AuronArrowFFIExporter
+import org.apache.auron.configuration.AuronConfiguration
+import org.apache.auron.jni.AuronAdaptor
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
class ArrowFFIExporter(rowIter: Iterator[InternalRow], schema: StructType)
extends AuronArrowFFIExporter {
- private val maxBatchNumRows = AuronConf.BATCH_SIZE.intConf()
- private val maxBatchMemorySize = AuronConf.SUGGESTED_BATCH_MEM_SIZE.intConf()
+ private val sparkAuronConfig: AuronConfiguration =
+ AuronAdaptor.getInstance.getAuronConfiguration
+ private val maxBatchNumRows =
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
+ private val maxBatchMemorySize =
+
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
private val arrowSchema = ArrowUtils.toArrowSchema(schema)
private val emptyDictionaryProvider = new MapDictionaryProvider()
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
index e470a0f4..7f75e4d6 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
@@ -22,7 +22,6 @@ import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.auron.AuronConf
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -57,7 +56,9 @@ import org.apache.spark.sql.types.BinaryType
import org.apache.spark.sql.types.DataType
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
abstract class NativeAggBase(
execMode: AggExecMode,
@@ -154,7 +155,8 @@ abstract class NativeAggBase(
child.outputPartitioning
private def supportsPartialSkipping = (
- AuronConf.PARTIAL_AGG_SKIPPING_ENABLE.booleanConf()
+ AuronAdaptor.getInstance.getAuronConfiguration.getBoolean(
+ SparkAuronConfiguration.PARTIAL_AGG_SKIPPING_ENABLE)
&& initialInputBufferOffset == 0
&& aggregateExpressions.forall(_.mode == Partial)
&& requiredChildDistribution.forall(_ == UnspecifiedDistribution)