This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b956a72bda [GLUTEN-8326][CORE] Rename GlutenConfig.getConf to
GlutenConfig.get (#8395)
b956a72bda is described below
commit b956a72bda67527fa605e801cfe4ecbb65dda047
Author: Kaifei Yi <[email protected]>
AuthorDate: Fri Jan 3 14:34:14 2025 +0800
[GLUTEN-8326][CORE] Rename GlutenConfig.getConf to GlutenConfig.get (#8395)
---
.../shuffle/CHCelebornColumnarBatchSerializer.scala | 4 ++--
.../shuffle/CHCelebornColumnarShuffleWriter.scala | 4 ++--
.../sql/delta/ClickhouseOptimisticTransaction.scala | 2 +-
.../gluten/backendsapi/clickhouse/CHBackend.scala | 6 +++---
.../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 4 ++--
.../backendsapi/clickhouse/CHValidatorApi.scala | 2 +-
.../extension/CommonSubexpressionEliminateRule.scala | 4 ++--
.../extension/CountDistinctWithoutExpand.scala | 4 +---
.../gluten/extension/ExtendedColumnPruning.scala | 2 +-
.../extension/FallbackBroadcastHashJoinRules.scala | 14 +++++++-------
.../RewriteDateTimestampComparisonRule.scala | 4 ++--
.../extension/RewriteToDateExpresstionRule.scala | 4 ++--
.../gluten/extension/WriteFilesWithBucketValue.scala | 4 ++--
.../vectorized/CHColumnarBatchSerializer.scala | 4 ++--
.../spark/shuffle/CHColumnarShuffleWriter.scala | 10 +++++-----
.../catalyst/CHAggregateFunctionRewriteRule.scala | 4 ++--
.../datasources/v1/CHParquetWriterInjects.scala | 4 ++--
.../spark/sql/execution/utils/CHExecUtil.scala | 6 +++---
.../VeloxCelebornColumnarBatchSerializer.scala | 8 ++++----
.../shuffle/VeloxCelebornColumnarShuffleWriter.scala | 4 ++--
.../writer/VeloxUniffleColumnarShuffleWriter.java | 10 +++++-----
.../gluten/backendsapi/velox/VeloxBackend.scala | 20 ++++++++++----------
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 18 +++++++++---------
.../execution/ColumnarPartialProjectExec.scala | 6 ++----
.../gluten/execution/RowToVeloxColumnarExec.scala | 4 ++--
.../BloomFilterMightContainJointRewriteRule.scala | 2 +-
.../extension/FlushableHashAggregateRule.scala | 2 +-
.../org/apache/gluten/extension/HLLRewriteRule.scala | 4 ++--
.../gluten/vectorized/ColumnarBatchSerializer.scala | 6 +++---
.../apache/spark/shuffle/ColumnarShuffleWriter.scala | 16 ++++++++--------
.../apache/spark/sql/execution/BroadcastUtils.scala | 2 +-
.../velox/VeloxParquetWriterInjects.scala | 4 ++--
.../arrow/alloc/ManagedAllocationListener.java | 2 +-
.../gluten/memory/listener/ReservationListeners.java | 4 ++--
.../scala/org/apache/gluten/utils/DebugUtil.scala | 12 ++++++------
.../gluten/celeborn/CelebornShuffleManager.java | 2 +-
.../shuffle/CelebornColumnarShuffleWriter.scala | 8 ++++----
.../memtarget/DynamicOffHeapSizingMemoryTarget.java | 2 +-
.../gluten/memory/memtarget/MemoryTargets.java | 4 ++--
.../memory/memtarget/spark/TreeMemoryConsumers.java | 2 +-
.../extension/columnar/ColumnarRuleExecutor.scala | 2 +-
.../gluten/softaffinity/SoftAffinityManager.scala | 8 ++++----
.../org/apache/spark/softaffinity/SoftAffinity.scala | 2 +-
.../gluten/substrait/rel/IcebergLocalFilesNode.java | 4 ++--
.../apache/gluten/substrait/rel/LocalFilesNode.java | 8 ++++----
.../main/java/org/apache/gluten/test/TestStats.java | 2 +-
.../gluten/backendsapi/BackendSettingsApi.scala | 2 +-
.../gluten/execution/BasicScanExecTransformer.scala | 2 +-
.../BroadcastNestedLoopJoinExecTransformer.scala | 2 +-
.../execution/GlutenWholeStageColumnarRDD.scala | 2 +-
.../gluten/execution/WholeStageTransformer.scala | 8 ++++----
.../gluten/execution/WindowExecTransformer.scala | 2 +-
.../gluten/expression/ExpressionMappings.scala | 2 +-
.../columnar/CollapseProjectExecTransformer.scala | 2 +-
.../gluten/extension/columnar/FallbackRules.scala | 6 +++---
.../columnar/MergeTwoPhasesHashBaseAggregate.scala | 6 +++---
.../extension/columnar/UnionTransformerRule.scala | 2 +-
.../enumerated/planner/cost/RoughCoster2.scala | 8 ++++----
.../columnar/heuristic/ExpandFallbackPolicy.scala | 8 ++++----
.../columnar/offload/OffloadSingleNodeRules.scala | 6 +++---
.../extension/columnar/rewrite/RewriteJoin.scala | 2 +-
.../extension/columnar/validator/Validators.scala | 2 +-
.../apache/spark/shuffle/GlutenShuffleUtils.scala | 2 +-
.../sql/hive/HiveTableScanNestedColumnPruning.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 2 +-
.../sql/GlutenBloomFilterAggregateQuerySuite.scala | 6 +++---
.../sql/execution/FallbackStrategiesSuite.scala | 2 +-
.../sql/GlutenBloomFilterAggregateQuerySuite.scala | 6 +++---
.../sql/execution/FallbackStrategiesSuite.scala | 2 +-
.../sql/GlutenBloomFilterAggregateQuerySuite.scala | 6 +++---
.../sql/execution/FallbackStrategiesSuite.scala | 2 +-
.../main/scala/org/apache/gluten/GlutenConfig.scala | 2 +-
.../sql/execution/adaptive/GlutenCostEvaluator.scala | 2 +-
.../sql/execution/adaptive/GlutenCostEvaluator.scala | 2 +-
.../sql/execution/adaptive/GlutenCostEvaluator.scala | 2 +-
.../sql/execution/adaptive/GlutenCostEvaluator.scala | 2 +-
76 files changed, 176 insertions(+), 180 deletions(-)
diff --git
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
index 72145f1b5f..23ee2543ec 100644
---
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
+++
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala
@@ -59,14 +59,14 @@ private class CHCelebornColumnarBatchSerializerInstance(
with Logging {
private lazy val conf = SparkEnv.get.conf
- private lazy val gluten_conf = GlutenConfig.getConf
+ private lazy val gluten_conf = GlutenConfig.get
private lazy val compressionCodec =
GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
- GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
+ GlutenConfig.get.columnarShuffleCodecBackend.orNull)
override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
diff --git
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index 2263cda7ea..c7d7286c0c 100644
---
a/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++
b/backends-clickhouse/src-celeborn/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -80,10 +80,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
nativeBufferSize,
capitalizedCompressionCodec,
compressionLevel,
- GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
+ GlutenConfig.get.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
- GlutenConfig.getConf.chColumnarForceMemorySortShuffle
+ GlutenConfig.get.chColumnarForceMemorySortShuffle
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
)
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index a4411e83f2..f35fc021c7 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -68,7 +68,7 @@ class ClickhouseOptimisticTransaction(
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
- val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
+ val nativeWrite = GlutenConfig.get.enableNativeWriter.getOrElse(false)
if (writingMergeTree) {
// TODO: update FallbackByBackendSettings for mergetree always return
true
val onePipeline = nativeWrite &&
CHConf.get.enableOnePipelineMergeTreeWrite
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 97d862e13c..bc710f15f2 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -282,11 +282,11 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
}
override def supportSortExec(): Boolean = {
- GlutenConfig.getConf.enableColumnarSort
+ GlutenConfig.get.enableColumnarSort
}
override def supportSortMergeJoinExec(): Boolean = {
- GlutenConfig.getConf.enableColumnarSortMergeJoin
+ GlutenConfig.get.enableColumnarSortMergeJoin
}
override def supportWindowExec(windowFunctions: Seq[NamedExpression]):
Boolean = {
@@ -391,7 +391,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
}
override def enableNativeWriteFiles(): Boolean = {
- GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
+ GlutenConfig.get.enableNativeWriter.getOrElse(false)
}
override def supportCartesianProductExec(): Boolean = true
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 6352288815..fc04040f14 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -452,12 +452,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
val readBatchNumRows = metrics("avgReadBatchNumRows")
val numOutputRows = metrics("numOutputRows")
val dataSize = metrics("dataSize")
- if (GlutenConfig.getConf.isUseCelebornShuffleManager) {
+ if (GlutenConfig.get.isUseCelebornShuffleManager) {
val clazz =
ClassUtils.getClass("org.apache.spark.shuffle.CHCelebornColumnarBatchSerializer")
val constructor =
clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric],
classOf[SQLMetric])
constructor.newInstance(readBatchNumRows, numOutputRows,
dataSize).asInstanceOf[Serializer]
- } else if (GlutenConfig.getConf.isUseUniffleShuffleManager) {
+ } else if (GlutenConfig.get.isUseUniffleShuffleManager) {
throw new UnsupportedOperationException("temporarily uniffle not support
ch ")
} else {
new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
index 29d26410b9..0d31859c07 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
@@ -95,7 +95,7 @@ class CHValidatorApi extends ValidatorApi with
AdaptiveSparkPlanHelper with Logg
}
case rangePartitoning: RangePartitioning =>
if (
- GlutenConfig.getConf.enableColumnarSort &&
+ GlutenConfig.get.enableColumnarSort &&
RangePartitionerBoundsGenerator.supportedOrderings(rangePartitoning,
child)
) {
None
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
index 28d19fa29a..25042e81e1 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
@@ -39,8 +39,8 @@ class CommonSubexpressionEliminateRule(spark: SparkSession)
extends Rule[Logical
override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan =
if (
- plan.resolved && GlutenConfig.getConf.enableGluten
- && GlutenConfig.getConf.enableCommonSubexpressionEliminate &&
!plan.fastEquals(lastPlan)
+ plan.resolved && GlutenConfig.get.enableGluten
+ && GlutenConfig.get.enableCommonSubexpressionEliminate &&
!plan.fastEquals(lastPlan)
) {
lastPlan = plan
visitPlan(plan)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
index 82051baeeb..a79f474b29 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CountDistinctWithoutExpand.scala
@@ -32,9 +32,7 @@ import
org.apache.spark.sql.catalyst.trees.TreePattern.AGGREGATE_EXPRESSION
*/
object CountDistinctWithoutExpand extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
- if (
- GlutenConfig.getConf.enableGluten &&
GlutenConfig.getConf.enableCountDistinctWithoutExpand
- ) {
+ if (GlutenConfig.get.enableGluten &&
GlutenConfig.get.enableCountDistinctWithoutExpand) {
plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION))
{
case ae: AggregateExpression
if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] &&
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
index f2a0a549bc..821c1b8b26 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
@@ -36,7 +36,7 @@ object ExtendedGeneratorNestedColumnAliasing {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
case pj @ Project(projectList, f @ Filter(condition, g: Generate))
if canPruneGenerator(g.generator) &&
- GlutenConfig.getConf.enableExtendedColumnPruning &&
+ GlutenConfig.get.enableExtendedColumnPruning &&
(SQLConf.get.nestedPruningOnExpressions ||
SQLConf.get.nestedSchemaPruningEnabled) =>
val attrToExtractValues =
getAttributeToExtractValues(projectList ++ g.generator.children :+
condition, Seq.empty)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
index 6a788617a6..25346c0a7f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
@@ -37,7 +37,7 @@ import scala.util.control.Breaks.{break, breakable}
// queryStagePrepRules.
case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession)
extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- val glutenConf: GlutenConfig = GlutenConfig.getConf
+ val glutenConf: GlutenConfig = GlutenConfig.get
plan.foreach {
case bhj: BroadcastHashJoinExec =>
val buildSidePlan = bhj.buildSide match {
@@ -106,8 +106,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session:
SparkSession) extend
case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
val isTransformable =
if (
- !GlutenConfig.getConf.enableColumnarBroadcastExchange ||
- !GlutenConfig.getConf.enableColumnarBroadcastJoin
+ !GlutenConfig.get.enableColumnarBroadcastExchange ||
+ !GlutenConfig.get.enableColumnarBroadcastJoin
) {
ValidationResult.failed(
"columnar broadcast exchange is disabled or " +
@@ -146,12 +146,12 @@ case class
FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
case class FallbackBroadcastHashJoin(session: SparkSession) extends
Rule[SparkPlan] {
private val enableColumnarBroadcastJoin: Boolean =
- GlutenConfig.getConf.enableColumnarBroadcastJoin &&
- GlutenConfig.getConf.enableColumnarBroadcastExchange
+ GlutenConfig.get.enableColumnarBroadcastJoin &&
+ GlutenConfig.get.enableColumnarBroadcastExchange
private val enableColumnarBroadcastNestedLoopJoin: Boolean =
- GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled
&&
- GlutenConfig.getConf.enableColumnarBroadcastExchange
+ GlutenConfig.get.broadcastNestedLoopJoinTransformerTransformerEnabled &&
+ GlutenConfig.get.enableColumnarBroadcastExchange
override def apply(plan: SparkPlan): SparkPlan = {
plan.foreachUp {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
index fa8a37ffa2..d495919dde 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
@@ -56,8 +56,8 @@ class RewriteDateTimestampComparisonRule(spark: SparkSession)
override def apply(plan: LogicalPlan): LogicalPlan = {
if (
plan.resolved &&
- GlutenConfig.getConf.enableGluten &&
- GlutenConfig.getConf.enableRewriteDateTimestampComparison
+ GlutenConfig.get.enableGluten &&
+ GlutenConfig.get.enableRewriteDateTimestampComparison
) {
visitPlan(plan)
} else {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
index 6e84863304..43c0206b58 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
@@ -42,8 +42,8 @@ class RewriteToDateExpresstionRule(spark: SparkSession)
extends Rule[LogicalPlan
override def apply(plan: LogicalPlan): LogicalPlan = {
if (
plan.resolved &&
- GlutenConfig.getConf.enableGluten &&
- GlutenConfig.getConf.enableCHRewriteDateConversion
+ GlutenConfig.get.enableGluten &&
+ GlutenConfig.get.enableCHRewriteDateConversion
) {
visitPlan(plan)
} else {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala
index 8ab78dcff9..bc5e77f51d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/WriteFilesWithBucketValue.scala
@@ -34,8 +34,8 @@ object WriteFilesWithBucketValue extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (
- GlutenConfig.getConf.enableGluten
- && GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
+ GlutenConfig.get.enableGluten
+ && GlutenConfig.get.enableNativeWriter.getOrElse(false)
) {
plan.transformDown {
case writeFiles: WriteFilesExec if writeFiles.bucketSpec.isDefined =>
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
index 370d93d7e7..6795818f6c 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala
@@ -61,9 +61,9 @@ private class CHColumnarBatchSerializerInstance(
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
- GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
+ GlutenConfig.get.columnarShuffleCodecBackend.orNull)
- private val useColumnarShuffle: Boolean =
GlutenConfig.getConf.isUseColumnarShuffleManager
+ private val useColumnarShuffle: Boolean =
GlutenConfig.get.isUseColumnarShuffleManager
override def deserializeStream(in: InputStream): DeserializationStream = {
// Don't use GlutenConfig in this method. It will execute in non task
Thread.
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index b0595fd05d..acbc41ceed 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -51,17 +51,17 @@ class CHColumnarShuffleWriter[K, V](
.map(_.getAbsolutePath)
.mkString(",")
private val subDirsPerLocalDir =
blockManager.diskBlockManager.subDirsPerLocalDir
- private val splitSize = GlutenConfig.getConf.maxBatchSize
+ private val splitSize = GlutenConfig.get.maxBatchSize
private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private val capitalizedCompressionCodec =
compressionCodec.toUpperCase(Locale.ROOT)
private val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
- GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
- private val maxSortBufferSize =
GlutenConfig.getConf.chColumnarMaxSortBufferSize
- private val forceMemorySortShuffle =
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
- private val spillThreshold =
GlutenConfig.getConf.chColumnarShuffleSpillThreshold
+ GlutenConfig.get.columnarShuffleCodecBackend.orNull)
+ private val maxSortBufferSize = GlutenConfig.get.chColumnarMaxSortBufferSize
+ private val forceMemorySortShuffle =
GlutenConfig.get.chColumnarForceMemorySortShuffle
+ private val spillThreshold = GlutenConfig.get.chColumnarShuffleSpillThreshold
private val jniWrapper = new CHShuffleSplitterJniWrapper
// Are we in the process of stopping? Because map tasks can call stop() with
success = true
// and then call stop() with success = false if they get an exception, we
want to make sure
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala
index b11e1e2bb3..a18aad0393 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/catalyst/CHAggregateFunctionRewriteRule.scala
@@ -35,8 +35,8 @@ case class CHAggregateFunctionRewriteRule(spark:
SparkSession) extends Rule[Logi
case a: Aggregate =>
a.transformExpressions {
case avgExpr @ AggregateExpression(avg: Average, _, _, _, _)
- if GlutenConfig.getConf.enableCastAvgAggregateFunction &&
- GlutenConfig.getConf.enableColumnarHashAgg &&
+ if GlutenConfig.get.enableCastAvgAggregateFunction &&
+ GlutenConfig.get.enableColumnarHashAgg &&
!avgExpr.isDistinct && isDataTypeNeedConvert(avg.child.dataType)
=>
AggregateExpression(
avg.copy(child = Cast(avg.child, DoubleType)),
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala
index 1e4d0b8841..164eaaf8ca 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala
@@ -34,11 +34,11 @@ class CHParquetWriterInjects extends CHFormatWriterInjects {
sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec)
val blockSize = options.getOrElse(
GlutenConfig.PARQUET_BLOCK_SIZE,
- GlutenConfig.getConf.columnarParquetWriteBlockSize.toString)
+ GlutenConfig.get.columnarParquetWriteBlockSize.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_SIZE, blockSize)
val blockRows = options.getOrElse(
GlutenConfig.PARQUET_BLOCK_ROWS,
- GlutenConfig.getConf.columnarParquetWriteBlockRows.toString)
+ GlutenConfig.get.columnarParquetWriteBlockRows.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows)
sparkOptions
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index 21d1132488..6a0e5d0c6d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -239,7 +239,7 @@ object CHExecUtil extends Logging {
private def buildPartitioningOptions(nativePartitioning:
NativePartitioning): IteratorOptions = {
val options = new IteratorOptions
- options.setBufferSize(GlutenConfig.getConf.maxBatchSize)
+ options.setBufferSize(GlutenConfig.get.maxBatchSize)
options.setName(nativePartitioning.getShortName)
options.setPartitionNum(nativePartitioning.getNumPartitions)
options.setExpr(new String(nativePartitioning.getExprList))
@@ -345,8 +345,8 @@ object CHExecUtil extends Logging {
val rddWithPartitionKey: RDD[Product2[Int, ColumnarBatch]] =
if (
- GlutenConfig.getConf.isUseColumnarShuffleManager
- || GlutenConfig.getConf.isUseCelebornShuffleManager
+ GlutenConfig.get.isUseColumnarShuffleManager
+ || GlutenConfig.get.isUseCelebornShuffleManager
) {
newPartitioning match {
case _ =>
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index a4a97d43de..a519fdab5f 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -85,12 +85,12 @@ private class CelebornColumnarBatchSerializerInstance(
null // uncompressed
}
val compressionCodecBackend =
- GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
- val shuffleWriterType = GlutenConfig.getConf.celebornShuffleWriterType
+ GlutenConfig.get.columnarShuffleCodecBackend.orNull
+ val shuffleWriterType = GlutenConfig.get.celebornShuffleWriterType
.replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER)
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
- val batchSize = GlutenConfig.getConf.maxBatchSize
- val bufferSize = GlutenConfig.getConf.columnarShuffleReaderBufferSize
+ val batchSize = GlutenConfig.get.maxBatchSize
+ val bufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
val handle = jniWrapper
.make(
cSchema.memoryAddress(),
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index 165d68785d..389d325730 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -127,7 +127,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
compressionLevel,
compressionBufferSize,
bufferCompressThreshold,
- GlutenConfig.getConf.columnarShuffleCompressionMode,
+ GlutenConfig.get.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
conf.get(SHUFFLE_SORT_USE_RADIXSORT),
clientPushBufferMaxSize,
@@ -138,7 +138,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning,
context.partitionId),
"celeborn",
shuffleWriterType,
- GlutenConfig.getConf.columnarShuffleReallocThreshold
+ GlutenConfig.get.columnarShuffleReallocThreshold
)
runtime
.memoryManager()
diff --git
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 7cbf452f76..663fd3792e 100644
---
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -64,8 +64,8 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends
RssShuffleWriter<K,
private boolean stopping = false;
private final int compressThreshold =
- GlutenConfig.getConf().columnarShuffleCompressionThreshold();
- private final double reallocThreshold =
GlutenConfig.getConf().columnarShuffleReallocThreshold();
+ GlutenConfig.get().columnarShuffleCompressionThreshold();
+ private final double reallocThreshold =
GlutenConfig.get().columnarShuffleReallocThreshold();
private String compressionCodec;
private int compressionLevel;
private int compressionBufferSize;
@@ -74,7 +74,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends
RssShuffleWriter<K,
private final Runtime runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName(),
"UniffleShuffleWriter");
private final ShuffleWriterJniWrapper jniWrapper =
ShuffleWriterJniWrapper.create(runtime);
- private final int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
+ private final int nativeBufferSize = GlutenConfig.get().maxBatchSize();
private final int bufferSize;
private final Boolean isSort;
@@ -127,7 +127,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
GlutenShuffleUtils.getCompressionLevel(
sparkConf,
compressionCodec,
-
GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null));
+ GlutenConfig.get().columnarShuffleCodecBackend().getOrElse(() ->
null));
compressionBufferSize =
GlutenShuffleUtils.getSortEvictBufferSize(sparkConf,
compressionCodec);
}
@@ -158,7 +158,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
compressionLevel,
compressionBufferSize,
compressThreshold,
- GlutenConfig.getConf().columnarShuffleCompressionMode(),
+ GlutenConfig.get().columnarShuffleCompressionMode(),
(int) (long)
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
(boolean)
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()),
bufferSize,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index dba41b1f8d..a752e5a0f5 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -109,7 +109,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (filteredRootPaths.nonEmpty) {
val resolvedPaths =
- if (GlutenConfig.getConf.enableHdfsViewfs) {
+ if (GlutenConfig.get.enableHdfsViewfs) {
ViewFileSystemUtils.convertViewfsToHdfs(
filteredRootPaths,
mutable.Map.empty[String, String],
@@ -145,7 +145,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
val charTypePattern = "char\\((\\d+)\\)".r
- GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled &&
charTypePattern
+ GlutenConfig.get.forceOrcCharTypeScanFallbackEnabled && charTypePattern
.findFirstIn(
CharVarcharUtils
.getRawTypeString(metadata)
@@ -158,7 +158,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
val typeValidator: PartialFunction[StructField, String] = {
// Parquet timestamp is not fully supported yet
case StructField(_, TimestampType, _, _)
- if
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
+ if
GlutenConfig.get.forceParquetTimestampTypeScanFallbackEnabled =>
"TimestampType(force fallback)"
}
val parquetOptions = new
ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
@@ -170,7 +170,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
case DwrfReadFormat => None
case OrcReadFormat =>
- if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
+ if (!GlutenConfig.get.veloxOrcScanEnabled) {
Some(s"Velox ORC scan is turned off,
${GlutenConfig.VELOX_ORC_SCAN_ENABLED.key}")
} else {
val typeValidator: PartialFunction[StructField, String] = {
@@ -315,7 +315,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None // Parquet is directly supported
- case h: HiveFileFormat if
GlutenConfig.getConf.enableHiveFileFormatWriter =>
+ case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter
=>
validateHiveFileFormat(h) // Parquet via Hive SerDe
case _ =>
Some(
@@ -375,7 +375,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportSortExec(): Boolean = true
override def supportSortMergeJoinExec(): Boolean = {
- GlutenConfig.getConf.enableColumnarSortMergeJoin
+ GlutenConfig.get.enableColumnarSortMergeJoin
}
override def supportWindowGroupLimitExec(rankLikeFunction: Expression):
Boolean = {
@@ -456,7 +456,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
override def supportColumnarShuffleExec(): Boolean = {
- val conf = GlutenConfig.getConf
+ val conf = GlutenConfig.get
conf.enableColumnarShuffle && (conf.isUseGlutenShuffleManager
|| conf.isUseColumnarShuffleManager
|| conf.isUseCelebornShuffleManager
@@ -513,7 +513,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def alwaysFailOnMapExpression(): Boolean = true
override def requiredChildOrderingForWindow(): Boolean = {
- GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")
+ GlutenConfig.get.veloxColumnarWindowType.equals("streaming")
}
override def requiredChildOrderingForWindowGroupLimit(): Boolean = false
@@ -523,13 +523,13 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def allowDecimalArithmetic: Boolean = true
override def enableNativeWriteFiles(): Boolean = {
- GlutenConfig.getConf.enableNativeWriter.getOrElse(
+ GlutenConfig.get.enableNativeWriter.getOrElse(
SparkShimLoader.getSparkShims.enableNativeWriteFilesByDefault()
)
}
override def enableNativeArrowReadFiles(): Boolean = {
- GlutenConfig.getConf.enableNativeArrowReader
+ GlutenConfig.get.enableNativeArrowReader
}
override def shouldRewriteCount(): Boolean = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index e971b980a8..04a6b3492c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -347,8 +347,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
plan match {
case shuffle: ColumnarShuffleExchangeExec
if !shuffle.useSortBasedShuffle &&
- GlutenConfig.getConf.veloxResizeBatchesShuffleInput =>
- val range = GlutenConfig.getConf.veloxResizeBatchesShuffleInputRange
+ GlutenConfig.get.veloxResizeBatchesShuffleInput =>
+ val range = GlutenConfig.get.veloxResizeBatchesShuffleInputRange
val appendBatches =
VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
shuffle.withNewChildren(Seq(appendBatches))
@@ -543,12 +543,12 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
/** Determine whether to use sort-based shuffle based on shuffle
partitioning and output. */
override def useSortBasedShuffle(partitioning: Partitioning, output:
Seq[Attribute]): Boolean = {
- val conf = GlutenConfig.getConf
+ val conf = GlutenConfig.get
lazy val isCelebornSortBasedShuffle = conf.isUseCelebornShuffleManager &&
conf.celebornShuffleWriterType == GlutenConfig.GLUTEN_SORT_SHUFFLE_WRITER
partitioning != SinglePartition &&
- (partitioning.numPartitions >=
GlutenConfig.getConf.columnarShuffleSortPartitionsThreshold ||
- output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold)
||
+ (partitioning.numPartitions >=
GlutenConfig.get.columnarShuffleSortPartitionsThreshold ||
+ output.size >= GlutenConfig.get.columnarShuffleSortColumnsThreshold) ||
isCelebornSortBasedShuffle
}
@@ -601,7 +601,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val deserializeTime = metrics("deserializeTime")
val readBatchNumRows = metrics("avgReadBatchNumRows")
val decompressTime = metrics("decompressTime")
- if (GlutenConfig.getConf.isUseCelebornShuffleManager) {
+ if (GlutenConfig.get.isUseCelebornShuffleManager) {
val clazz =
ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer")
val constructor =
clazz.getConstructor(classOf[StructType], classOf[SQLMetric],
classOf[SQLMetric])
@@ -624,7 +624,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {
val useOffheapBroadcastBuildRelation =
- GlutenConfig.getConf.enableBroadcastBuildRelationInOffheap
+ GlutenConfig.get.enableBroadcastBuildRelationInOffheap
val serialized: Array[ColumnarBatchSerializeResult] = child
.executeColumnar()
.mapPartitions(itr => Iterator(BroadcastUtils.serializeStream(itr)))
@@ -738,7 +738,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
// ISOControl characters, refer java.lang.Character.isISOControl(int)
val isoControlStr = (('\u0000' to '\u001F') ++ ('\u007F' to
'\u009F')).toList.mkString
// scalastyle:on nonascii
- if (GlutenConfig.getConf.castFromVarcharAddTrimNode && c.child.dataType ==
StringType) {
+ if (GlutenConfig.get.castFromVarcharAddTrimNode && c.child.dataType ==
StringType) {
val trimStr = c.dataType match {
case BinaryType | _: ArrayType | _: MapType | _: StructType | _:
UserDefinedType[_] =>
None
@@ -780,7 +780,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
}
override def rewriteSpillPath(path: String): String = {
- val fs = GlutenConfig.getConf.veloxSpillFileSystem
+ val fs = GlutenConfig.get.veloxSpillFileSystem
fs match {
case "local" =>
path
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index 576f3a2cb2..256983d70c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -138,7 +138,7 @@ case class ColumnarPartialProjectExec(original:
ProjectExec, child: SparkPlan)(
}
override protected def doValidateInternal(): ValidationResult = {
- if (!GlutenConfig.getConf.enableColumnarPartialProject) {
+ if (!GlutenConfig.get.enableColumnarPartialProject) {
return ValidationResult.failed("Config disable this feature")
}
if (UDFAttrNotExists) {
@@ -164,9 +164,7 @@ case class ColumnarPartialProjectExec(original:
ProjectExec, child: SparkPlan)(
return ValidationResult.failed("Contains expression not supported")
}
if (
- ExpressionUtils.hasComplexExpressions(
- original,
- GlutenConfig.getConf.fallbackExpressionsThreshold)
+ ExpressionUtils.hasComplexExpressions(original,
GlutenConfig.get.fallbackExpressionsThreshold)
) {
return ValidationResult.failed("Fallback by complex expression")
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index 9cdcf854db..5030de65bf 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -48,7 +48,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBas
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
- val numRows = GlutenConfig.getConf.maxBatchSize
+ val numRows = GlutenConfig.get.maxBatchSize
// This avoids calling `schema` in the RDD closure, so that we don't need
to include the entire
// plan (this) in the closure.
val localSchema = schema
@@ -68,7 +68,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBas
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
- val numRows = GlutenConfig.getConf.maxBatchSize
+ val numRows = GlutenConfig.get.maxBatchSize
val mode = BroadcastUtils.getBroadcastMode(outputPartitioning)
val relation = child.executeBroadcast()
BroadcastUtils.sparkToVeloxUnsafe(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
index 56a3d86a90..360b2b9c5e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkPlan
case class BloomFilterMightContainJointRewriteRule(spark: SparkSession)
extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- if (!GlutenConfig.getConf.enableNativeBloomFilter) {
+ if (!GlutenConfig.get.enableNativeBloomFilter) {
return plan
}
val out = plan.transformWithSubqueries {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
index 2e53906977..f051aa878d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/FlushableHashAggregateRule.scala
@@ -34,7 +34,7 @@ import
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
case class FlushableHashAggregateRule(session: SparkSession) extends
Rule[SparkPlan] {
import FlushableHashAggregateRule._
override def apply(plan: SparkPlan): SparkPlan = {
- if (!GlutenConfig.getConf.enableVeloxFlushablePartialAggregation) {
+ if (!GlutenConfig.get.enableVeloxFlushablePartialAggregation) {
return plan
}
plan.transformUpWithPruning(_.containsPattern(EXCHANGE)) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
index 8ceee3d573..59f633d4a5 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
@@ -33,8 +33,8 @@ case class HLLRewriteRule(spark: SparkSession) extends
Rule[LogicalPlan] {
case a: Aggregate =>
a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
case aggExpr @ AggregateExpression(hll: HyperLogLogPlusPlus, _, _,
_, _)
- if GlutenConfig.getConf.enableNativeHyperLogLogAggregateFunction
&&
- GlutenConfig.getConf.enableColumnarHashAgg &&
+ if GlutenConfig.get.enableNativeHyperLogLogAggregateFunction &&
+ GlutenConfig.get.enableColumnarHashAgg &&
isSupportedDataType(hll.child.dataType) =>
val hllAdapter = HLLAdapter(
hll.child,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index cd035e3202..3d2b568e7b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -97,9 +97,9 @@ private class ColumnarBatchSerializerInstance(
null // uncompressed
}
val compressionCodecBackend =
- GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
- val batchSize = GlutenConfig.getConf.maxBatchSize
- val bufferSize = GlutenConfig.getConf.columnarShuffleReaderBufferSize
+ GlutenConfig.get.columnarShuffleCodecBackend.orNull
+ val batchSize = GlutenConfig.get.maxBatchSize
+ val bufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"ShuffleReader")
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
val shuffleReaderHandle = jniWrapper.make(
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index bb84e3066b..56967d7d12 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -63,8 +63,8 @@ class ColumnarShuffleWriter[K, V](
.mkString(",")
private lazy val nativeBufferSize = {
- val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
- val maxBatchSize = GlutenConfig.getConf.maxBatchSize
+ val bufferSize = GlutenConfig.get.shuffleWriterBufferSize
+ val maxBatchSize = GlutenConfig.get.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds
max " +
@@ -75,9 +75,9 @@ class ColumnarShuffleWriter[K, V](
}
}
- private val nativeMergeBufferSize = GlutenConfig.getConf.maxBatchSize
+ private val nativeMergeBufferSize = GlutenConfig.get.maxBatchSize
- private val nativeMergeThreshold =
GlutenConfig.getConf.columnarShuffleMergeThreshold
+ private val nativeMergeThreshold =
GlutenConfig.get.columnarShuffleMergeThreshold
private val compressionCodec =
if (conf.getBoolean(SHUFFLE_COMPRESS.key,
SHUFFLE_COMPRESS.defaultValue.get)) {
@@ -87,7 +87,7 @@ class ColumnarShuffleWriter[K, V](
}
private val compressionCodecBackend =
- GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
+ GlutenConfig.get.columnarShuffleCodecBackend.orNull
private val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
compressionCodecBackend)
@@ -96,9 +96,9 @@ class ColumnarShuffleWriter[K, V](
GlutenShuffleUtils.getSortEvictBufferSize(conf, compressionCodec)
private val bufferCompressThreshold =
- GlutenConfig.getConf.columnarShuffleCompressionThreshold
+ GlutenConfig.get.columnarShuffleCompressionThreshold
- private val reallocThreshold =
GlutenConfig.getConf.columnarShuffleReallocThreshold
+ private val reallocThreshold =
GlutenConfig.get.columnarShuffleReallocThreshold
private val runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName, "ShuffleWriter")
@@ -149,7 +149,7 @@ class ColumnarShuffleWriter[K, V](
compressionLevel,
sortEvictBufferSize,
bufferCompressThreshold,
- GlutenConfig.getConf.columnarShuffleCompressionMode,
+ GlutenConfig.get.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
conf.get(SHUFFLE_SORT_USE_RADIXSORT),
dataTmp.getAbsolutePath,
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
index 491de4b220..96b2eedab6 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
@@ -93,7 +93,7 @@ object BroadcastUtils {
schema: StructType,
from: Broadcast[F],
fn: Iterator[InternalRow] => Iterator[ColumnarBatch]): Broadcast[T] = {
- val useOffheapBuildRelation =
GlutenConfig.getConf.enableBroadcastBuildRelationInOffheap
+ val useOffheapBuildRelation =
GlutenConfig.get.enableBroadcastBuildRelationInOffheap
mode match {
case HashedRelationBroadcastMode(_, _) =>
// HashedRelation to ColumnarBuildSideRelation.
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
index b6d4f6557b..f00469f86d 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala
@@ -33,11 +33,11 @@ class VeloxParquetWriterInjects extends
VeloxFormatWriterInjects {
sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec)
val blockSize = options.getOrElse(
GlutenConfig.PARQUET_BLOCK_SIZE,
- GlutenConfig.getConf.columnarParquetWriteBlockSize.toString)
+ GlutenConfig.get.columnarParquetWriteBlockSize.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_SIZE, blockSize)
val blockRows = options.getOrElse(
GlutenConfig.PARQUET_BLOCK_ROWS,
- GlutenConfig.getConf.columnarParquetWriteBlockRows.toString)
+ GlutenConfig.get.columnarParquetWriteBlockRows.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows)
options
.get(GlutenConfig.PARQUET_GZIP_WINDOW_SIZE)
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java
index a76c0aabee..8176b2817f 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ManagedAllocationListener.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ManagedAllocationListener implements AllocationListener,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(ManagedAllocationListener.class);
- public static long BLOCK_SIZE =
GlutenConfig.getConf().memoryReservationBlockSize();
+ public static long BLOCK_SIZE =
GlutenConfig.get().memoryReservationBlockSize();
private final MemoryTarget target;
private final SimpleMemoryUsageRecorder sharedUsage;
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
index 9d63a8601b..4e0be4e692 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
@@ -44,8 +44,8 @@ public final class ReservationListeners {
private static ReservationListener create0(
String name, Spiller spiller, Map<String, MemoryUsageStatsBuilder>
mutableStats) {
// Memory target.
- final double overAcquiredRatio =
GlutenConfig.getConf().memoryOverAcquiredRatio();
- final long reservationBlockSize =
GlutenConfig.getConf().memoryReservationBlockSize();
+ final double overAcquiredRatio =
GlutenConfig.get().memoryOverAcquiredRatio();
+ final long reservationBlockSize =
GlutenConfig.get().memoryReservationBlockSize();
final TaskMemoryManager tmm =
TaskResources.getLocalTaskContext().taskMemoryManager();
final TreeMemoryTarget consumer =
MemoryTargets.newConsumer(
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
index f9bb7478e7..e21159ae49 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
@@ -27,23 +27,23 @@ object DebugUtil {
// if specify stageId and partitionId, then only do that partition for that
stage
def saveInputToFile(): Boolean = {
def taskIdMatches =
- GlutenConfig.getConf.benchmarkTaskId.nonEmpty &&
- GlutenConfig.getConf.benchmarkTaskId
+ GlutenConfig.get.benchmarkTaskId.nonEmpty &&
+ GlutenConfig.get.benchmarkTaskId
.split(",")
.map(_.toLong)
.contains(TaskContext.get().taskAttemptId())
def partitionIdMatches =
- TaskContext.get().stageId() == GlutenConfig.getConf.benchmarkStageId &&
- (GlutenConfig.getConf.benchmarkPartitionId.isEmpty ||
- GlutenConfig.getConf.benchmarkPartitionId
+ TaskContext.get().stageId() == GlutenConfig.get.benchmarkStageId &&
+ (GlutenConfig.get.benchmarkPartitionId.isEmpty ||
+ GlutenConfig.get.benchmarkPartitionId
.split(",")
.map(_.toInt)
.contains(TaskContext.get().partitionId()))
val saveInput = taskIdMatches || partitionIdMatches
if (saveInput) {
- if (GlutenConfig.getConf.benchmarkSaveDir.isEmpty) {
+ if (GlutenConfig.get.benchmarkSaveDir.isEmpty) {
throw new IllegalArgumentException(GlutenConfig.BENCHMARK_SAVE_DIR.key
+ " is not set.")
}
}
diff --git
a/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
b/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index 5e14531d6f..a4d4c4f5c5 100644
---
a/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++
b/gluten-celeborn/src-celeborn/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -216,7 +216,7 @@ public class CelebornShuffleManager implements
ShuffleManager {
if (dependency instanceof ColumnarShuffleDependency) {
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
- if (GlutenConfig.getConf().enableCelebornFallback()) {
+ if (GlutenConfig.get().enableCelebornFallback()) {
logger.warn("Fallback to ColumnarShuffleManager!");
columnarShuffleIds.add(shuffleId);
return columnarShuffleManager().registerShuffle(shuffleId,
dependency);
diff --git
a/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
b/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
index 42e939e442..1da1554178 100644
---
a/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/src-celeborn/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
@@ -52,8 +52,8 @@ abstract class CelebornColumnarShuffleWriter[K, V](
protected val mapId: Int = context.partitionId()
protected lazy val nativeBufferSize: Int = {
- val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
- val maxBatchSize = GlutenConfig.getConf.maxBatchSize
+ val bufferSize = GlutenConfig.get.shuffleWriterBufferSize
+ val maxBatchSize = GlutenConfig.get.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds
max " +
@@ -95,13 +95,13 @@ abstract class CelebornColumnarShuffleWriter[K, V](
GlutenShuffleUtils.getCompressionLevel(
conf,
customizedCompressionCodec,
- GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
+ GlutenConfig.get.columnarShuffleCodecBackend.orNull)
protected val compressionBufferSize: Int =
GlutenShuffleUtils.getSortEvictBufferSize(conf, customizedCompressionCodec)
protected val bufferCompressThreshold: Int =
- GlutenConfig.getConf.columnarShuffleCompressionThreshold
+ GlutenConfig.get.columnarShuffleCompressionThreshold
// Are we in the process of stopping? Because map tasks can call stop() with
success = true
// and then call stop() with success = false if they get an exception, we
want to make sure
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index ea18a05d0c..cdcdb282f4 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -30,7 +30,7 @@ public class DynamicOffHeapSizingMemoryTarget implements
MemoryTarget {
private final MemoryTarget delegated;
// When dynamic off-heap sizing is enabled, the off-heap should be sized for
the total usable
// memory, so we can use it as the max memory we will use.
- private static final long MAX_MEMORY_IN_BYTES =
GlutenConfig.getConf().offHeapMemorySize();
+ private static final long MAX_MEMORY_IN_BYTES =
GlutenConfig.get().offHeapMemorySize();
private static final AtomicLong USED_OFFHEAP_BYTES = new AtomicLong();
public DynamicOffHeapSizingMemoryTarget(MemoryTarget delegated) {
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index c6f5b59de8..d3df35bd40 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -50,7 +50,7 @@ public final class MemoryTargets {
@Experimental
public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget
memoryTarget) {
- if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) {
+ if (GlutenConfig.get().dynamicOffHeapSizingEnabled()) {
return new DynamicOffHeapSizingMemoryTarget(memoryTarget);
}
@@ -63,7 +63,7 @@ public final class MemoryTargets {
Spiller spiller,
Map<String, MemoryUsageStatsBuilder> virtualChildren) {
final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.factory(tmm);
- if (GlutenConfig.getConf().memoryIsolation()) {
+ if (GlutenConfig.get().memoryIsolation()) {
return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller,
virtualChildren);
}
final TreeMemoryTarget root = factory.legacyRoot();
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
index a11a4a3e4a..9dc1422c52 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
@@ -77,7 +77,7 @@ public final class TreeMemoryConsumers {
* <p>See <a
href="https://github.com/oap-project/gluten/issues/3030">GLUTEN-3030</a>
*/
public TreeMemoryTarget isolatedRoot() {
- return
ofCapacity(GlutenConfig.getConf().conservativeTaskOffHeapMemorySize());
+ return
ofCapacity(GlutenConfig.get().conservativeTaskOffHeapMemorySize());
}
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
index eea698e078..d56dbbe5d9 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
@@ -57,7 +57,7 @@ object ColumnarRuleExecutor {
override def apply(plan: SparkPlan): SparkPlan = {
val (out, millisTime) =
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
- logOnLevel(GlutenConfig.getConf.transformPlanLogLevel, message(plan,
out, millisTime))
+ logOnLevel(GlutenConfig.get.transformPlanLogLevel, message(plan, out,
millisTime))
out
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
index 754e42b81c..66a38d5ac7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
@@ -116,7 +116,7 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
totalRegisteredExecutors.addAndGet(1)
}
logOnLevel(
- GlutenConfig.getConf.softAffinityLogLevel,
+ GlutenConfig.get.softAffinityLogLevel,
s"After adding executor ${execHostId._1} on host ${execHostId._2}, " +
s"idForExecutors is ${idForExecutors.mkString(",")}, " +
s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
@@ -149,7 +149,7 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
totalRegisteredExecutors.addAndGet(-1)
}
logOnLevel(
- GlutenConfig.getConf.softAffinityLogLevel,
+ GlutenConfig.get.softAffinityLogLevel,
s"After removing executor $execId, " +
s"idForExecutors is ${idForExecutors.mkString(",")}, " +
s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
@@ -196,7 +196,7 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
(originalValues ++ value)
}
logOnLevel(
- GlutenConfig.getConf.softAffinityLogLevel,
+ GlutenConfig.get.softAffinityLogLevel,
s"update host for $key: ${values.mkString(",")}")
duplicateReadingInfos.put(key, values)
}
@@ -279,7 +279,7 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
if (!hosts.isEmpty) {
rand.shuffle(hosts)
logOnLevel(
- GlutenConfig.getConf.softAffinityLogLevel,
+ GlutenConfig.get.softAffinityLogLevel,
s"get host for $f: ${hosts.distinct.mkString(",")}")
}
hosts.distinct.toSeq
diff --git
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
index 66e4456d46..558aec9621 100644
---
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -66,7 +66,7 @@ abstract class Affinity(val manager: AffinityManager) extends
LogLevelUtil with
val locations = manager.askExecutors(filePath)
if (locations.nonEmpty) {
logOnLevel(
- GlutenConfig.getConf.softAffinityLogLevel,
+ GlutenConfig.get.softAffinityLogLevel,
s"SAMetrics=File $filePath - the expected executors are
${locations.mkString("_")} ")
locations.map { case (executor, host) => toTaskLocation(host, executor) }
} else {
diff --git
a/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
b/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
index 398bdbdb5f..bb2fc6fc68 100644
---
a/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
+++
b/gluten-iceberg/src-iceberg/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java
@@ -64,7 +64,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions =
ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder()
.setEnableRowGroupMaxminIndex(
- GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex())
+ GlutenConfig.get().enableParquetRowGroupMaxMinIndex())
.build();
icebergBuilder.setParquet(parquetReadOptions);
break;
@@ -104,7 +104,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions
=
ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder()
.setEnableRowGroupMaxminIndex(
-
GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex())
+ GlutenConfig.get().enableParquetRowGroupMaxMinIndex())
.build();
deleteFileBuilder.setParquet(parquetReadOptions);
break;
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index 9513f49760..69e81126c8 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -198,7 +198,7 @@ public class LocalFilesNode implements SplitInfo {
ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions
=
ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder()
.setEnableRowGroupMaxminIndex(
-
GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex())
+ GlutenConfig.get().enableParquetRowGroupMaxMinIndex())
.build();
fileBuilder.setParquet(parquetReadOptions);
break;
@@ -225,15 +225,15 @@ public class LocalFilesNode implements SplitInfo {
.setHeader(Long.parseLong(header))
.setEscape(escape)
.setNullValue(nullValue)
-
.setMaxBlockSize(GlutenConfig.getConf().textInputMaxBlockSize())
-
.setEmptyAsDefault(GlutenConfig.getConf().textIputEmptyAsDefault())
+ .setMaxBlockSize(GlutenConfig.get().textInputMaxBlockSize())
+
.setEmptyAsDefault(GlutenConfig.get().textIputEmptyAsDefault())
.build();
fileBuilder.setText(textReadOptions);
break;
case JsonReadFormat:
ReadRel.LocalFiles.FileOrFiles.JsonReadOptions jsonReadOptions =
ReadRel.LocalFiles.FileOrFiles.JsonReadOptions.newBuilder()
-
.setMaxBlockSize(GlutenConfig.getConf().textInputMaxBlockSize())
+ .setMaxBlockSize(GlutenConfig.get().textInputMaxBlockSize())
.build();
fileBuilder.setJson(jsonReadOptions);
break;
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
index 37d9e09bb5..03f8c7a1b2 100644
--- a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
+++ b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
@@ -37,7 +37,7 @@ public class TestStats {
public static int offloadGlutenTestNumber = 0;
private static boolean enabled() {
- return GlutenConfig.getConf().collectUtStats();
+ return GlutenConfig.get().collectUtStats();
}
public static void reset() {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 052348aee4..887c761fc5 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -68,7 +68,7 @@ trait BackendSettingsApi {
false
}
def supportColumnarShuffleExec(): Boolean = {
- GlutenConfig.getConf.enableColumnarShuffle
+ GlutenConfig.get.enableColumnarShuffle
}
def enableJoinKeysRewrite(): Boolean = true
def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 0d1296999a..8626b40de6 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -52,7 +52,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
val fileFormat: ReadFileFormat
def getRootFilePaths: Seq[String] = {
- if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) {
+ if (GlutenConfig.get.scanFileSchemeValidationEnabled) {
getRootPathsInternal
} else {
Seq.empty
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index 1e0c3d5a7e..8da7bdd503 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -185,7 +185,7 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
}
override protected def doValidateInternal(): ValidationResult = {
- if
(!GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled) {
+ if
(!GlutenConfig.get.broadcastNestedLoopJoinTransformerTransformerEnabled) {
return ValidationResult.failed(
s"Config
${GlutenConfig.BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED.key} not enabled")
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index de5fc85f35..cdd1fe652b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -59,7 +59,7 @@ class GlutenWholeStageColumnarRDD(
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit)
extends RDD[ColumnarBatch](sc, rdds.getDependencies) {
- private val numaBindingInfo = GlutenConfig.getConf.numaBindingInfo
+ private val numaBindingInfo = GlutenConfig.get.numaBindingInfo
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 003efb5f9a..6ab0e22ce2 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -62,7 +62,7 @@ case class WholeStageTransformContext(root: PlanNode,
substraitContext: Substrai
* Since https://github.com/apache/incubator-gluten/pull/2185.
*/
trait ValidatablePlan extends GlutenPlan with LogLevelUtil {
- protected def glutenConf: GlutenConfig = GlutenConfig.getConf
+ protected def glutenConf: GlutenConfig = GlutenConfig.get
protected lazy val enableNativeValidation = glutenConf.enableNativeValidation
@@ -214,7 +214,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val serializableHadoopConf: SerializableConfiguration = new
SerializableConfiguration(
sparkContext.hadoopConfiguration)
- val numaBindingInfo: GlutenNumaBindingInfo =
GlutenConfig.getConf.numaBindingInfo
+ val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo
@transient
private var wholeStageTransformerContext: Option[WholeStageTransformContext]
= None
@@ -360,7 +360,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
}(
t =>
logOnLevel(
- GlutenConfig.getConf.substraitPlanLogLevel,
+ GlutenConfig.get.substraitPlanLogLevel,
s"$nodeName generating the substrait plan took: $t ms."))
val inputRDDs = new ColumnarInputRDDsWrapper(columnarInputRDDs)
// Check if BatchScan exists.
@@ -376,7 +376,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val allScanPartitions =
basicScanExecTransformers.map(_.getPartitions.toIndexedSeq)
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers,
allScanPartitions)
- if (GlutenConfig.getConf.enableHdfsViewfs) {
+ if (GlutenConfig.get.enableHdfsViewfs) {
val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
allScanSplitInfos.foreach {
splitInfos =>
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
index 28d7809924..48d71fc3ef 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
@@ -88,7 +88,7 @@ case class WindowExecTransformer(
val windowParametersStr = new StringBuffer("WindowParameters:")
// isStreaming: 1 for streaming, 0 for sort
val isStreaming: Int =
- if (GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")) 1
else 0
+ if (GlutenConfig.get.veloxColumnarWindowType.equals("streaming")) 1 else 0
windowParametersStr
.append("isStreaming=")
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index f6cc01f8ac..a3a6135cdb 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -339,7 +339,7 @@ object ExpressionMappings {
)
def expressionsMap: Map[Class[_], String] = {
- val blacklist = GlutenConfig.getConf.expressionBlacklist
+ val blacklist = GlutenConfig.get.expressionBlacklist
val filtered = (defaultExpressionsMap ++ toMap(
BackendsApiManager.getSparkPlanExecApiInstance.extraExpressionMappings)).filterNot(
kv => blacklist.contains(kv._2))
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
index bfb926706c..bbb6fa1b8f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkPlan
object CollapseProjectExecTransformer extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- if (!GlutenConfig.getConf.enableColumnarProjectCollapse) {
+ if (!GlutenConfig.get.enableColumnarProjectCollapse) {
return plan
}
plan.transformUp {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
index 432ecd1584..76f6b2e680 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.joins._
case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- if (GlutenConfig.getConf.enableAnsiMode) {
+ if (GlutenConfig.get.enableAnsiMode) {
plan.foreach(FallbackTags.add(_, "does not support ansi mode"))
}
plan
@@ -35,7 +35,7 @@ case class FallbackOnANSIMode(session: SparkSession) extends
Rule[SparkPlan] {
}
case class FallbackMultiCodegens(session: SparkSession) extends
Rule[SparkPlan] {
- lazy val glutenConf: GlutenConfig = GlutenConfig.getConf
+ lazy val glutenConf: GlutenConfig = GlutenConfig.get
lazy val physicalJoinOptimize = glutenConf.enablePhysicalJoinOptimize
lazy val optimizeLevel: Integer = glutenConf.physicalJoinOptimizationThrottle
@@ -47,7 +47,7 @@ case class FallbackMultiCodegens(session: SparkSession)
extends Rule[SparkPlan]
case plan: ShuffledHashJoinExec =>
if ((count + 1) >= optimizeLevel) return true
plan.children.exists(existsMultiCodegens(_, count + 1))
- case plan: SortMergeJoinExec if
GlutenConfig.getConf.forceShuffledHashJoin =>
+ case plan: SortMergeJoinExec if GlutenConfig.get.forceShuffledHashJoin =>
if ((count + 1) >= optimizeLevel) return true
plan.children.exists(existsMultiCodegens(_, count + 1))
case _ => false
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala
index a034a3229a..b5ef2b8471 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashBaseAggregate.scala
@@ -38,11 +38,11 @@ case class MergeTwoPhasesHashBaseAggregate(session:
SparkSession)
extends Rule[SparkPlan]
with Logging {
- val glutenConf: GlutenConfig = GlutenConfig.getConf
+ val glutenConf: GlutenConfig = GlutenConfig.get
val scanOnly: Boolean = glutenConf.enableScanOnly
val enableColumnarHashAgg: Boolean = !scanOnly &&
glutenConf.enableColumnarHashAgg
- val replaceSortAggWithHashAgg: Boolean =
GlutenConfig.getConf.forceToUseHashAgg
- val mergeTwoPhasesAggEnabled: Boolean =
GlutenConfig.getConf.mergeTwoPhasesAggEnabled
+ val replaceSortAggWithHashAgg: Boolean = GlutenConfig.get.forceToUseHashAgg
+ val mergeTwoPhasesAggEnabled: Boolean =
GlutenConfig.get.mergeTwoPhasesAggEnabled
private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg:
BaseAggregateExec): Boolean = {
// TODO: now it can not support to merge agg which there are the filters
in the aggregate exprs.
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala
index f0eea08018..40fea93eaa 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/UnionTransformerRule.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
*/
case class UnionTransformerRule() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- if (!GlutenConfig.getConf.enableNativeUnion) {
+ if (!GlutenConfig.get.enableNativeUnion) {
return plan
}
plan.transformUp {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala
index 832c524d7b..cc1c0f584e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala
@@ -45,11 +45,11 @@ object RoughCoster2 extends LongCoster {
case ColumnarToRowLike(_) => 1L
case RowToColumnarLike(_) =>
// If sizeBytes is less than the threshold, the cost of
RowToColumnarLike is ignored.
- if (sizeFactor == 0) 1L else GlutenConfig.getConf.rasRough2R2cCost
+ if (sizeFactor == 0) 1L else GlutenConfig.get.rasRough2R2cCost
case p if PlanUtil.isGlutenColumnarOp(p) => 1L
- case p if PlanUtil.isVanillaColumnarOp(p) =>
GlutenConfig.getConf.rasRough2VanillaCost
+ case p if PlanUtil.isVanillaColumnarOp(p) =>
GlutenConfig.get.rasRough2VanillaCost
// Other row ops. Usually a vanilla row op.
- case _ => GlutenConfig.getConf.rasRough2VanillaCost
+ case _ => GlutenConfig.get.rasRough2VanillaCost
}
opCost * Math.max(1, sizeFactor)
}
@@ -61,7 +61,7 @@ object RoughCoster2 extends LongCoster {
case _: LeafExecNode => 0L
case p => p.children.map(getStatSizeBytes).sum
}
- sizeBytes / GlutenConfig.getConf.rasRough2SizeBytesThreshold
+ sizeBytes / GlutenConfig.get.rasRough2SizeBytesThreshold
}
private def getStatSizeBytes(plan: SparkPlan): Long = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
index 3418d3dddc..2d5ac68520 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
@@ -74,7 +74,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
import ExpandFallbackPolicy._
private def countTransitionCost(plan: SparkPlan): Int = {
- val ignoreRowToColumnar = GlutenConfig.getConf.fallbackIgnoreRowToColumnar
+ val ignoreRowToColumnar = GlutenConfig.get.fallbackIgnoreRowToColumnar
var transitionCost = 0
def countFallbackInternal(plan: SparkPlan): Unit = {
plan match {
@@ -183,7 +183,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
private def fallback(plan: SparkPlan): FallbackInfo = {
val fallbackThreshold = if (isAdaptiveContext) {
- GlutenConfig.getConf.wholeStageFallbackThreshold
+ GlutenConfig.get.wholeStageFallbackThreshold
} else if (plan.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined) {
// if we are here, that means we are now at
`QueryExecution.preparations` and
// AQE is actually not applied. We do nothing for this case, and later in
@@ -191,7 +191,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
return FallbackInfo.DO_NOT_FALLBACK()
} else {
// AQE is not applied, so we use the whole query threshold to check if
should fallback
- GlutenConfig.getConf.queryFallbackThreshold
+ GlutenConfig.get.queryFallbackThreshold
}
if (fallbackThreshold < 0) {
return FallbackInfo.DO_NOT_FALLBACK()
@@ -255,7 +255,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
val vanillaSparkPlan = fallbackToRowBasedPlan(plan, outputsColumnar)
val vanillaSparkTransitionCost =
countTransitionCostForVanillaSparkPlan(vanillaSparkPlan)
if (
- GlutenConfig.getConf.fallbackPreferColumnar &&
+ GlutenConfig.get.fallbackPreferColumnar &&
fallbackInfo.netTransitionCost <= vanillaSparkTransitionCost
) {
plan
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 7f258d10b2..5fcc5675f8 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -43,7 +43,7 @@ case class OffloadExchange() extends OffloadSingleNode with
LogLevelUtil {
case p if FallbackTags.nonEmpty(p) =>
p
case s: ShuffleExchangeExec
- if (s.child.supportsColumnar ||
GlutenConfig.getConf.enablePreferColumnar) &&
+ if (s.child.supportsColumnar || GlutenConfig.get.enablePreferColumnar)
&&
BackendsApiManager.getSettings.supportColumnarShuffleExec() =>
logDebug(s"Columnar Processing for ${s.getClass} is currently
supported.")
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(s)
@@ -140,7 +140,7 @@ object OffloadJoin {
}
// Both left and right are buildable. Find out the better one.
- if (!GlutenConfig.getConf.shuffledHashJoinOptimizeBuildSide) {
+ if (!GlutenConfig.get.shuffledHashJoinOptimizeBuildSide) {
// User disabled build side re-optimization. Return original build side
from vanilla Spark.
return shj.buildSide
}
@@ -314,7 +314,7 @@ object OffloadOthers {
// Velox backend uses ColumnarArrowEvalPythonExec.
if (
!BackendsApiManager.getSettings.supportColumnarArrowUdf() ||
- !GlutenConfig.getConf.enableColumnarArrowUDF
+ !GlutenConfig.get.enableColumnarArrowUDF
) {
EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
} else {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
index 2d844a616c..fb6a8cae3c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala
@@ -56,7 +56,7 @@ object RewriteJoin extends RewriteSingleNode with
JoinSelectionHelper {
}
override def rewrite(plan: SparkPlan): SparkPlan = plan match {
- case smj: SortMergeJoinExec if GlutenConfig.getConf.forceShuffledHashJoin
=>
+ case smj: SortMergeJoinExec if GlutenConfig.get.forceShuffledHashJoin =>
getSmjBuildSide(smj) match {
case Some(buildSide) =>
ShuffledHashJoinExec(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 93c444fbd0..171235a7c3 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.hive.HiveTableScanExecTransformer
object Validators {
implicit class ValidatorBuilderImplicits(builder: Validator.Builder) {
- private val conf = GlutenConfig.getConf
+ private val conf = GlutenConfig.get
private val settings = BackendsApiManager.getSettings
/** Fails validation if a plan node was already tagged with
TRANSFORM_UNSUPPORTED. */
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
index e3e374256f..74cae15ee1 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
@@ -51,7 +51,7 @@ object GlutenShuffleUtils {
s"${validValues.mkString(", ")}, but was $codec")
}
}
- val glutenConfig = GlutenConfig.getConf
+ val glutenConfig = GlutenConfig.get
glutenConfig.columnarShuffleCodec match {
case Some(codec) =>
val glutenCodecKey = GlutenConfig.COLUMNAR_SHUFFLE_CODEC.key
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala
index 70b08a2ed8..a3480bbfca 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanNestedColumnPruning.scala
@@ -32,7 +32,7 @@ object HiveTableScanNestedColumnPruning extends Logging {
import org.apache.spark.sql.catalyst.expressions.SchemaPruning._
def supportNestedColumnPruning(project: ProjectExecTransformer): Boolean = {
- if (GlutenConfig.getConf.enableColumnarHiveTableScanNestedColumnPruning) {
+ if (GlutenConfig.get.enableColumnarHiveTableScanNestedColumnPruning) {
project.child match {
case HiveTableScanExecTransformer(_, relation, _, _) =>
relation.tableMeta.storage.inputFormat match {
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 754c292418..f22eb4b096 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -172,7 +172,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
- _ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
+ _ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
List(_ => RemoveFallbackTagRule())
)
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
index ddd4cf1d4c..e57caf8a37 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
@@ -37,7 +37,7 @@ class GlutenBloomFilterAggregateQuerySuite
GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304"
) {
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT every(might_contain(
| (SELECT bloom_filter_agg(col,
@@ -72,7 +72,7 @@ class GlutenBloomFilterAggregateQuerySuite
testGluten("Test bloom_filter_agg filter fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT col positive_membership_test
|FROM $table
@@ -118,7 +118,7 @@ class GlutenBloomFilterAggregateQuerySuite
testGluten("Test bloom_filter_agg agg fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT col positive_membership_test
|FROM $table
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 88e0ecf65a..c021b21035 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -182,7 +182,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
- _ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
+ _ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
List(_ => RemoveFallbackTagRule())
)
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
index 0c75db8308..5972194c1a 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
@@ -36,7 +36,7 @@ class GlutenBloomFilterAggregateQuerySuite
GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304"
) {
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT every(might_contain(
| (SELECT bloom_filter_agg(col,
@@ -71,7 +71,7 @@ class GlutenBloomFilterAggregateQuerySuite
testGluten("Test bloom_filter_agg filter fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT col positive_membership_test
|FROM $table
@@ -117,7 +117,7 @@ class GlutenBloomFilterAggregateQuerySuite
testGluten("Test bloom_filter_agg agg fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT col positive_membership_test
|FROM $table
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 88e0ecf65a..c021b21035 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -182,7 +182,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
- _ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
+ _ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
List(_ => RemoveFallbackTagRule())
)
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
index ddd4cf1d4c..e57caf8a37 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala
@@ -37,7 +37,7 @@ class GlutenBloomFilterAggregateQuerySuite
GlutenConfig.COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS.key -> "4194304"
) {
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT every(might_contain(
| (SELECT bloom_filter_agg(col,
@@ -72,7 +72,7 @@ class GlutenBloomFilterAggregateQuerySuite
testGluten("Test bloom_filter_agg filter fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT col positive_membership_test
|FROM $table
@@ -118,7 +118,7 @@ class GlutenBloomFilterAggregateQuerySuite
testGluten("Test bloom_filter_agg agg fallback") {
val table = "bloom_filter_test"
val numEstimatedItems = 5000000L
- val numBits = GlutenConfig.getConf.veloxBloomFilterMaxNumBits
+ val numBits = GlutenConfig.get.veloxBloomFilterMaxNumBits
val sqlString = s"""
|SELECT col positive_membership_test
|FROM $table
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 8908047a33..625e499df2 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -183,7 +183,7 @@ private object FallbackStrategiesSuite {
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()),
- _ => ColumnarCollapseTransformStages(GlutenConfig.getConf)
+ _ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
List(_ => RemoveFallbackTagRule())
)
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index e4f69e5748..2c03ecf3e8 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -680,7 +680,7 @@ object GlutenConfig {
var ins: GlutenConfig = _
- def getConf: GlutenConfig = {
+ def get: GlutenConfig = {
new GlutenConfig(SQLConf.get)
}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
index a6f066462a..ec05abb5d6 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.SparkPlan
/** This [[CostEvaluator]] is to force use the new physical plan when cost is
equal. */
case class GlutenCostEvaluator() extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
- if (GlutenConfig.getConf.enableGluten) {
+ if (GlutenConfig.get.enableGluten) {
new GlutenCost(SimpleCostEvaluator, plan)
} else {
SimpleCostEvaluator.evaluateCost(plan)
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
index 8fcfa735f4..89b5693baf 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.SQLConf
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin =
conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
- if (GlutenConfig.getConf.enableGluten) {
+ if (GlutenConfig.get.enableGluten) {
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
} else {
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
index 8fcfa735f4..89b5693baf 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.SQLConf
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin =
conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
- if (GlutenConfig.getConf.enableGluten) {
+ if (GlutenConfig.get.enableGluten) {
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
} else {
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
index 8fcfa735f4..89b5693baf 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.SQLConf
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin =
conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
- if (GlutenConfig.getConf.enableGluten) {
+ if (GlutenConfig.get.enableGluten) {
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
} else {
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]