This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-spark-auron-conf in repository https://gitbox.apache.org/repos/asf/auron.git
commit 778d8f7ff57ac03883e15807ec4687c27439b60f Author: zhangli20 <[email protected]> AuthorDate: Tue Jan 27 17:43:25 2026 +0800 deprecate AuronConf --- .../org/apache/auron/configuration/ConfigOption.java | 10 ++++++++++ .../auron/ForceApplyShuffledHashJoinInterceptor.java | 3 ++- .../scala/org/apache/spark/sql/auron/ShimsImpl.scala | 14 +++++++------- .../test/scala/org/apache/auron/AuronQuerySuite.scala | 6 +++--- .../scala/org/apache/auron/NativeConvertersSuite.scala | 11 ++++++----- .../auron/{AuronConf.java => DeprecatedAuronConf.java} | 14 +++++++------- .../auron/{JniBridge.java => DeprecatedJniBridge.java} | 2 +- .../spark/sql/auron/AuronCallNativeWrapper.scala | 18 +++++++++++------- .../org/apache/spark/sql/auron/AuronConverters.scala | 6 +++--- .../spark/sql/auron/AuronSparkSessionExtension.scala | 2 +- .../org/apache/spark/sql/auron/NativeHelper.scala | 2 +- .../spark/sql/auron/util/TaskContextHelper.scala | 2 +- .../plan/NativeParquetInsertIntoHiveTableBase.scala | 7 ++++--- 13 files changed, 57 insertions(+), 40 deletions(-) diff --git a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java index 925f3ddd..32941ed2 100644 --- a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java +++ b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java @@ -19,6 +19,7 @@ package org.apache.auron.configuration; import static org.apache.auron.util.Preconditions.checkNotNull; import java.util.function.Function; +import org.apache.auron.jni.AuronAdaptor; /** * A {@code ConfigOption} describes a configuration parameter. It encapsulates the configuration @@ -131,4 +132,13 @@ public class ConfigOption<T> { public Function<AuronConfiguration, T> dynamicDefaultValueFunction() { return dynamicDefaultValueFunction; } + + /** + * Retrieves the current value of this configuration option. + * + * @return the current value associated with this configuration option. + */ + public T get() { + return AuronAdaptor.getInstance().getAuronConfiguration().get(this); + } } diff --git a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java index 8d7d0dd5..3cc6cbd1 100644 --- a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java +++ b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.auron; import net.bytebuddy.implementation.bind.annotation.Argument; import net.bytebuddy.implementation.bind.annotation.RuntimeType; +import org.apache.auron.spark.configuration.SparkAuronConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +28,6 @@ public class ForceApplyShuffledHashJoinInterceptor { @RuntimeType public static Object intercept(@Argument(0) Object conf) { logger.debug("calling JoinSelectionHelper.forceApplyShuffledHashJoin() intercepted by auron"); - return AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf(); + return SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get(); } } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index 1427e01d..ad7c6c1d 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -113,6 +113,7 @@ import org.apache.spark.storage.FileSegment import org.apache.auron.{protobuf => pb, sparkver} import org.apache.auron.common.AuronBuildInfo import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.spark.configuration.SparkAuronConfiguration import org.apache.auron.spark.ui.AuronBuildInfoEvent class ShimsImpl extends Shims with Logging { @@ -134,7 +135,7 @@ class ShimsImpl extends Shims with Logging { override def initExtension(): Unit = { ValidateSparkPlanInjector.inject() - if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) { + if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) { ForceApplyShuffledHashJoinInjector.inject() } @@ -147,19 +148,18 @@ class ShimsImpl extends Shims with Logging { @sparkver("3.0 / 3.1") override def initExtension(): Unit = { - if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) { - logWarning(s"${AuronConf.FORCE_SHUFFLED_HASH_JOIN.key} is not supported in $shimVersion") + if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) { + logWarning( + s"${SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.key} is not supported in $shimVersion") } } // set Auron spark ui if spark.auron.ui.enabled is true override def onApplyingExtension(): Unit = { - logInfo( - " onApplyingExtension get ui_enabled : " + SparkEnv.get.conf - .get(AuronConf.UI_ENABLED.key, "true")) + logInfo(s"onApplyingExtension get ui_enabled: ${SparkAuronConfiguration.UI_ENABLED.get()}") - if (SparkEnv.get.conf.get(AuronConf.UI_ENABLED.key, "true").equals("true")) { + if (SparkAuronConfiguration.UI_ENABLED.get()) { val sparkContext = SparkContext.getActive.getOrElse { throw new IllegalStateException("No active spark context found that should not happen") } diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala index 349b489a..b0cce7f8 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala @@ -17,9 +17,9 @@ package org.apache.auron import org.apache.spark.sql.{AuronQueryTest, Row} -import org.apache.spark.sql.auron.AuronConf import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec +import org.apache.auron.spark.configuration.SparkAuronConfiguration import org.apache.auron.util.AuronTestUtils class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQLTestHelper { @@ -197,7 +197,7 @@ class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQ if (AuronTestUtils.isSparkV32OrGreater) { Seq(true, false).foreach { forcePositionalEvolution => withEnvConf( - AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { + SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { withTempPath { f => val path = f.getCanonicalPath Seq[(Integer, Integer)]((1, 2), (3, 4), (5, 6), (null, null)) @@ -228,7 +228,7 @@ class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQ if (AuronTestUtils.isSparkV32OrGreater) { Seq(true, false).foreach { forcePositionalEvolution => withEnvConf( - AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { + SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key -> forcePositionalEvolution.toString) { withTempPath { f => val path = f.getCanonicalPath Seq[(Integer, Integer, Integer)]((1, 2, 1), (3, 4, 2), (5, 6, 3), (null, null, 4)) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala index dc96731c..4d6af7f2 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala @@ -17,11 +17,12 @@ package org.apache.auron import org.apache.spark.sql.AuronQueryTest -import org.apache.spark.sql.auron.{AuronConf, NativeConverters} +import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, StringType} import org.apache.auron.protobuf.ScalarFunction +import org.apache.auron.spark.configuration.SparkAuronConfiguration class NativeConvertersSuite extends AuronQueryTest @@ -51,25 +52,25 @@ class NativeConvertersSuite } test("cast from string to numeric adds trim wrapper before native cast when enabled") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") { + withSQLConf(SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE.key -> "true") { assertTrimmedCast(" 42 ", IntegerType) } } test("cast from string to boolean adds trim wrapper before native cast when enabled") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") { + withSQLConf(SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE.key -> "true") { assertTrimmedCast(" true ", BooleanType) } } test("cast trim disabled via auron conf") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") { + withSQLConf(SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE.key -> "false") { assertNonTrimmedCast(" 42 ", IntegerType) } } test("cast trim disabled via auron conf for boolean cast") { - withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") { + withSQLConf(SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE.key -> "false") { assertNonTrimmedCast(" true ", BooleanType) } } diff --git a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java b/spark-extension/src/main/java/org/apache/spark/sql/auron/DeprecatedAuronConf.java similarity index 94% rename from spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java rename to spark-extension/src/main/java/org/apache/spark/sql/auron/DeprecatedAuronConf.java index 2943d4b2..04c34e73 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/DeprecatedAuronConf.java @@ -25,7 +25,7 @@ import org.apache.spark.SparkEnv$; */ @Deprecated @SuppressWarnings("unused") -public enum AuronConf { +public enum DeprecatedAuronConf { // support spark.auron.ui.enabled UI_ENABLED("spark.auron.ui.enabled", true), @@ -146,7 +146,7 @@ public enum AuronConf { public final String key; private final Object defaultValue; - AuronConf(String key, Object defaultValue) { + DeprecatedAuronConf(String key, Object defaultValue) { this.key = key; this.defaultValue = defaultValue; } @@ -172,23 +172,23 @@ public enum AuronConf { } public static boolean booleanConf(String confName) { - return AuronConf.valueOf(confName).booleanConf(); + return DeprecatedAuronConf.valueOf(confName).booleanConf(); } public static int intConf(String confName) { - return AuronConf.valueOf(confName).intConf(); + return DeprecatedAuronConf.valueOf(confName).intConf(); } public static long longConf(String confName) { - return AuronConf.valueOf(confName).longConf(); + return DeprecatedAuronConf.valueOf(confName).longConf(); } public static double doubleConf(String confName) { - return AuronConf.valueOf(confName).doubleConf(); + return DeprecatedAuronConf.valueOf(confName).doubleConf(); } public static String stringConf(String confName) { - return AuronConf.valueOf(confName).stringConf(); + return DeprecatedAuronConf.valueOf(confName).stringConf(); } private static SparkConf conf() { diff --git a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java b/spark-extension/src/main/java/org/apache/spark/sql/auron/DeprecatedJniBridge.java similarity index 99% rename from spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java rename to spark-extension/src/main/java/org/apache/spark/sql/auron/DeprecatedJniBridge.java index 3bea6e17..9c5f452a 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/DeprecatedJniBridge.java @@ -42,7 +42,7 @@ import org.apache.spark.sql.auron.util.TaskContextHelper$; */ @Deprecated @SuppressWarnings("unused") -public class JniBridge { +public class DeprecatedJniBridge { @Deprecated public static final ConcurrentHashMap<String, Object> resourcesMap = new ConcurrentHashMap<>(); diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala index bf0918c4..4100d11b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala @@ -44,6 +44,7 @@ import org.apache.spark.util.CompletionIterator import org.apache.spark.util.ShutdownHookManager import org.apache.spark.util.Utils +import org.apache.auron.configuration.AuronConfiguration import org.apache.auron.metric.{MetricNode, SparkMetricNode} import org.apache.auron.protobuf.PartitionId import org.apache.auron.protobuf.PhysicalPlanNode @@ -73,7 +74,10 @@ case class AuronCallNativeWrapper( logInfo(s"Start executing native plan ${nativePlan.getPhysicalPlanTypeCase}") private var nativeRuntimePtr = - JniBridge.callNative(NativeHelper.nativeMemory, AuronConf.NATIVE_LOG_LEVEL.stringConf(), this) + DeprecatedJniBridge.callNative( + NativeHelper.nativeMemory, + AuronConfiguration.NATIVE_LOG_LEVEL.get(), + this) private lazy val rowIterator = new Iterator[InternalRow] { override def hasNext: Boolean = { @@ -89,7 +93,7 @@ case class AuronCallNativeWrapper( // load next batch try { - if (nativeRuntimePtr != 0 && JniBridge.nextBatch(nativeRuntimePtr)) { + if (nativeRuntimePtr != 0 && DeprecatedJniBridge.nextBatch(nativeRuntimePtr)) { return hasNext } } finally { @@ -184,7 +188,7 @@ case class AuronCallNativeWrapper( batchCurRowIdx = 0 if (nativeRuntimePtr != 0) { - JniBridge.finalizeNative(nativeRuntimePtr) + DeprecatedJniBridge.finalizeNative(nativeRuntimePtr) nativeRuntimePtr = 0 dictionaryProvider.close() checkError() @@ -201,16 +205,16 @@ object AuronCallNativeWrapper extends Logging { private lazy val lazyInitNative: Unit = { logInfo( "Initializing native environment (" + - s"batchSize=${AuronConf.BATCH_SIZE.intConf()}, " + + s"batchSize=${AuronConfiguration.BATCH_SIZE.get()}, " + s"nativeMemory=${NativeHelper.nativeMemory}, " + - s"memoryFraction=${AuronConf.MEMORY_FRACTION.doubleConf()})") + s"memoryFraction=${AuronConfiguration.MEMORY_FRACTION.get()})") // arrow configuration System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND") - assert(classOf[JniBridge] != null) // preload JNI bridge classes + assert(classOf[DeprecatedJniBridge] != null) // preload JNI bridge classes AuronCallNativeWrapper.loadLibAuron() - ShutdownHookManager.addShutdownHook(() => JniBridge.onExit()) + ShutdownHookManager.addShutdownHook(() => DeprecatedJniBridge.onExit()) } private def loadLibAuron(): Unit = { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 73f5dfb8..de0b6105 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -461,7 +461,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -472,7 +472,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -480,7 +480,7 @@ object AuronConverters extends Logging { case p => throw new NotImplementedError( s"Cannot convert FileSourceScanExec tableIdentifier: ${tableIdentifier.getOrElse( - "unknown")}, class: ${p.getClass.getName}") + "unknown")}, class: ${p.getClass.getName}") } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index 1f8b6421..9944ea5b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -96,7 +96,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu dumpSimpleSparkPlanTreeNode(sparkPlanTransformed) logInfo(s"Transformed spark plan after preColumnarTransitions:\n${sparkPlanTransformed - .treeString(verbose = true, addSuffix = true)}") + .treeString(verbose = true, addSuffix = true)}") // post-transform Shims.get.postTransform(sparkPlanTransformed, sparkSession.sparkContext) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala index 3ae7669e..403008e6 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala @@ -74,7 +74,7 @@ object NativeHelper extends Logging { val heapMemory = Runtime.getRuntime.maxMemory() val offheapMemory = totalMemory - heapMemory logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString( - heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") + heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") offheapMemory } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala index 8e5d7353..eecec5ef 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala @@ -47,7 +47,7 @@ object TaskContextHelper extends Logging { val thread = Thread.currentThread() val threadName = if (context != null) { s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context - .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" + .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" } else { "auron native task " + thread.getName } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index d43f7d17..b9fb7f99 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -69,9 +69,10 @@ abstract class NativeParquetInsertIntoHiveTableBase( .filterKeys(Set("stage_id", "output_rows", "elapsed_compute")) .toSeq :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) - :+ ("bytes_written", - SQLMetrics - .createSizeMetric(sparkContext, "Native.bytes_written")): _*) + :+ ( + "bytes_written", + SQLMetrics + .createSizeMetric(sparkContext, "Native.bytes_written")): _*) def check(): Unit = { val hadoopConf = sparkContext.hadoopConfiguration
