This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2c3c89c116 [GLUTEN-9881][CORE] Minimize module dependency set of
module gluten-core (#9900)
2c3c89c116 is described below
commit 2c3c89c116068cbd885e2d38fc470f38fdeaf2d0
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jun 13 21:09:31 2025 +0100
[GLUTEN-9881][CORE] Minimize module dependency set of module gluten-core
(#9900)
---
.../apache/gluten/component/CHDeltaComponent.scala | 7 +-
.../backendsapi/clickhouse/CHListenerApi.scala | 6 +-
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 14 +-
.../backendsapi/clickhouse/CHTransformerApi.scala | 6 +-
.../gluten/component/VeloxDeltaComponent.scala | 7 +-
.../gluten/component/VeloxHudiComponent.scala | 7 +-
.../backendsapi/velox/VeloxListenerApi.scala | 6 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 23 ++-
.../gluten/execution/MiscOperatorSuite.scala | 6 +-
.../spark/memory/GlobalOffHeapMemorySuite.scala | 4 +-
gluten-arrow/pom.xml | 14 +-
gluten-core/pom.xml | 17 --
.../DynamicOffHeapSizingMemoryTarget.java | 4 +-
.../gluten/memory/memtarget/MemoryTargets.java | 6 +-
.../memory/memtarget/ThrowOnOomMemoryTarget.java | 25 ++-
.../memtarget/spark/TreeMemoryConsumers.java | 4 +-
.../scala/org/apache/gluten/GlutenPlugin.scala | 155 +++++---------
.../org/apache/gluten/component/Component.scala | 3 +
.../apache/gluten/config/GlutenCoreConfig.scala | 222 +++++++++++++++++++++
.../gluten/extension/GlutenSessionExtensions.scala | 6 +-
.../extension/columnar/ColumnarRuleApplier.scala | 5 +-
.../extension/columnar/ColumnarRuleExecutor.scala | 36 +---
.../columnar/enumerated/EnumeratedApplier.scala | 10 +-
.../enumerated/planner/plan/GlutenPlanModel.scala | 6 +-
.../columnar/heuristic/HeuristicApplier.scala | 12 +-
.../extension/columnar/transition/Convention.scala | 3 +-
.../columnar/transition/ConventionFunc.scala | 8 +-
.../extension/columnar/transition/Transition.scala | 4 +-
.../extension/columnar/transition/package.scala | 2 +-
.../gluten/extension/injector/GlutenInjector.scala | 22 +-
.../apache/spark/memory/GlobalOffHeapMemory.scala | 4 +-
.../execution/adaptive/GlutenCostEvaluator.scala | 6 +-
.../apache/spark/storage/BlockManagerUtil.scala | 2 +-
.../org/apache/spark/task/TaskResources.scala | 11 +-
.../org/apache/spark/util/SparkPlanUtil.scala | 47 +++++
.../org/apache/spark/util/SparkTaskUtil.scala | 120 +++++++++++
.../org/apache/spark/util/SparkVersionUtil.scala | 15 +-
.../memtarget/spark/TreeMemoryConsumerTest.java | 4 +-
.../org/apache/gluten/task/TaskResourceSuite.scala | 3 +-
.../gluten/execution/OffloadIcebergScan.scala | 5 +-
.../apache/gluten/execution/OffloadKafkaScan.scala | 5 +-
gluten-substrait/pom.xml | 17 ++
.../gluten/backendsapi/SubstraitBackend.scala | 110 +++++++++-
.../org/apache/gluten/config/GlutenConfig.scala | 207 ++-----------------
.../org/apache/gluten/config/ReservedKeys.scala | 0
.../gluten/extension/columnar/LoggedRule.scala | 48 +++++
.../gluten/softaffinity/SoftAffinityManager.scala | 0
.../ConsistentHashSoftAffinityStrategy.scala | 0
.../strategy/SoftAffinityAllocationTrait.scala | 0
.../apache/spark/softaffinity/SoftAffinity.scala | 0
.../spark/softaffinity/SoftAffinityListener.scala | 0
.../spark/sql/execution/ui/GlutenUIUtils.scala | 0
.../gluten/config/SparkConfigUtilSuite.scala | 0
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../org/apache/gluten/sql/shims/SparkShims.scala | 19 +-
.../apache/spark/util/SparkShimVersionUtil.scala | 19 +-
.../gluten/sql/shims/spark32/Spark32Shims.scala | 8 +-
.../scala/org/apache/spark/TaskContextUtils.scala | 47 -----
.../gluten/sql/shims/spark33/Spark33Shims.scala | 11 +-
.../scala/org/apache/spark/TaskContextUtils.scala | 48 -----
.../gluten/sql/shims/spark34/Spark34Shims.scala | 13 +-
.../scala/org/apache/spark/TaskContextUtils.scala | 49 -----
.../gluten/sql/shims/spark35/Spark35Shims.scala | 13 +-
.../scala/org/apache/spark/TaskContextUtils.scala | 49 -----
67 files changed, 814 insertions(+), 728 deletions(-)
diff --git
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
index 85caf73546..a39c56003a 100644
---
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
+++
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.component
import org.apache.gluten.backendsapi.clickhouse.CHBackend
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{OffloadDeltaFilter, OffloadDeltaNode,
OffloadDeltaProject}
import org.apache.gluten.extension.DeltaPostTransformRules
import org.apache.gluten.extension.columnar.enumerated.RasOffload
@@ -44,7 +45,9 @@ class CHDeltaComponent extends Component {
legacy.injectTransform {
c =>
val offload = Seq(OffloadDeltaNode(), OffloadDeltaProject(),
OffloadDeltaFilter())
- HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
+ HeuristicTransform.Simple(
+ Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+ offload)
}
val offloads: Seq[RasOffload] = Seq(
RasOffload.from[ProjectExec](OffloadDeltaProject()),
@@ -53,7 +56,7 @@ class CHDeltaComponent extends Component {
offloads.foreach(
offload =>
ras.injectRasRule(
- c => RasOffload.Rule(offload, Validators.newValidator(c.glutenConf),
Nil)))
+ c => RasOffload.Rule(offload, Validators.newValidator(new
GlutenConfig(c.sqlConf)), Nil)))
DeltaPostTransformRules.rules.foreach {
r =>
legacy.injectPostTransform(_ => r)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 5cddfb6e8a..ba74884d2c 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.ListenerApi
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig}
import org.apache.gluten.execution.CHBroadcastBuildSideCache
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.UDFMappings
@@ -102,8 +102,8 @@ class CHListenerApi extends ListenerApi with Logging {
// add memory limit for external sort
if (conf.get(RuntimeSettings.MAX_BYTES_BEFORE_EXTERNAL_SORT) <= 0) {
- if (conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, defaultValue =
false)) {
- val memSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY,
0)
+ if (conf.getBoolean(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY,
defaultValue = false)) {
+ val memSize =
conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY, 0)
if (memSize > 0L) {
val cores = conf.getInt("spark.executor.cores", 1).toLong
val sortMemLimit = ((memSize / cores) * 0.8).toLong
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index f9867fea34..f0c81a62d9 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -104,7 +104,8 @@ object CHRuleApi {
injector.injectTransform(
c =>
intercept(
- HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf),
rewrites, offloads)))
+ HeuristicTransform
+ .WithRewrites(validatorBuilder(new GlutenConfig(c.sqlConf)),
rewrites, offloads)))
// Legacy: Post-transform rules.
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
@@ -120,8 +121,8 @@ object CHRuleApi {
injector.injectPostTransform(
c =>
intercept(
-
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
- c.session)))
+ SparkPlanRules.extendedColumnarRule(
+ new
GlutenConfig(c.sqlConf).extendedColumnarTransformRules)(c.session)))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, CHBatchType))
@@ -140,16 +141,17 @@ object CHRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => intercept(each(c.session))))
- injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
+ injector.injectPost(c => ColumnarCollapseTransformStages(new
GlutenConfig(c.sqlConf)))
injector.injectPost(
c =>
intercept(
-
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session)))
+ SparkPlanRules.extendedColumnarRule(
+ new GlutenConfig(c.sqlConf).extendedColumnarPostRules)(c.session)))
injector.injectPost(c => GlutenNoopWriterRule.apply(c.session))
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
- injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
+ injector.injectFinal(c => GlutenFallbackReporter(new
GlutenConfig(c.sqlConf), c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index e0d4b9f83e..9da8eed202 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.TransformerApi
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.execution.{CHHashAggregateExecTransformer,
WriteFilesExecTransformer}
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.SubstraitContext
@@ -98,9 +98,9 @@ class CHTransformerApi extends TransformerApi with Logging {
backendPrefix: String): Unit = {
require(backendPrefix == CHConfig.CONF_PREFIX)
- if (nativeConfMap.getOrDefault(GlutenConfig.SPARK_OFFHEAP_ENABLED,
"false").toBoolean) {
+ if (nativeConfMap.getOrDefault(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY,
"false").toBoolean) {
val offHeapSize: Long =
- SparkConfigUtil.get(nativeConfMap,
GlutenConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
+ SparkConfigUtil.get(nativeConfMap,
GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
if (offHeapSize > 0) {
// Only set default max_bytes_before_external_group_by for CH when it
is not set explicitly.
val groupBySpillKey =
CHConfig.runtimeSettings("max_bytes_before_external_group_by")
diff --git
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
index e09edb371a..d8648c987d 100644
---
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
+++
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.component
import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{OffloadDeltaFilter, OffloadDeltaProject,
OffloadDeltaScan}
import org.apache.gluten.extension.DeltaPostTransformRules
import org.apache.gluten.extension.columnar.enumerated.RasOffload
@@ -38,7 +39,9 @@ class VeloxDeltaComponent extends Component {
c =>
val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(),
OffloadDeltaFilter())
.map(_.toStrcitRule())
- HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
+ HeuristicTransform.Simple(
+ Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+ offload)
}
val offloads: Seq[RasOffload] = Seq(
RasOffload.from[FileSourceScanExec](OffloadDeltaScan()),
@@ -48,7 +51,7 @@ class VeloxDeltaComponent extends Component {
offloads.foreach(
offload =>
ras.injectRasRule(
- c => RasOffload.Rule(offload, Validators.newValidator(c.glutenConf),
Nil)))
+ c => RasOffload.Rule(offload, Validators.newValidator(new
GlutenConfig(c.sqlConf)), Nil)))
DeltaPostTransformRules.rules.foreach {
r =>
legacy.injectPostTransform(_ => r)
diff --git
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
index 146b7be7e8..6685066efa 100644
---
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
+++
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.component
import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.OffloadHudiScan
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
@@ -36,13 +37,15 @@ class VeloxHudiComponent extends Component {
legacy.injectTransform {
c =>
val offload = Seq(OffloadHudiScan()).map(_.toStrcitRule())
- HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
+ HeuristicTransform.Simple(
+ Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+ offload)
}
ras.injectRasRule {
c =>
RasOffload.Rule(
RasOffload.from[FileSourceScanExec](OffloadHudiScan()),
- Validators.newValidator(c.glutenConf),
+ Validators.newValidator(new GlutenConfig(c.sqlConf)),
Nil)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 44bfd0aab3..e8e793be5c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.ListenerApi
import
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType,
ArrowNativeBatchType}
-import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig, VeloxConfig}
import org.apache.gluten.config.VeloxConfig._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.UDFMappings
@@ -79,7 +79,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)
// Overhead memory limits.
- val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
+ val offHeapSize =
conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY)
val desiredOverheadSize = (0.3 *
offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
if (!SparkResourceUtil.isMemoryOverheadSet(conf)) {
// If memory overhead is not set by user, automatically set it according
to off-heap settings.
@@ -98,7 +98,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
s" the recommended size
${ByteUnit.BYTE.toMiB(desiredOverheadSize)}MiB." +
s" This may cause OOM.")
}
- conf.set(GlutenConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES, overheadSize)
+ conf.set(GlutenCoreConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES, overheadSize)
// Sql table cache serializer.
if (conf.get(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED)) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index f53e28bd93..48e5ef1a43 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -86,7 +86,11 @@ object VeloxRuleApi {
PullOutPostProject,
ProjectColumnPruning)
injector.injectTransform(
- c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf),
rewrites, offloads))
+ c =>
+ HeuristicTransform.WithRewrites(
+ validatorBuilder(new GlutenConfig(c.sqlConf)),
+ rewrites,
+ offloads))
// Legacy: Post-transform rules.
injector.injectPostTransform(_ =>
AppendBatchResizeForShuffleInputAndOutput())
@@ -113,15 +117,16 @@ object VeloxRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
- injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
+ injector.injectPost(c => ColumnarCollapseTransformStages(new
GlutenConfig(c.sqlConf)))
injector.injectPost(c => GlutenNoopWriterRule(c.session))
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(
c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatchType)))
- injector.injectFinal(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
- injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
+ injector.injectFinal(
+ c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf),
c.session))
+ injector.injectFinal(c => GlutenFallbackReporter(new
GlutenConfig(c.sqlConf), c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}
@@ -176,7 +181,7 @@ object VeloxRuleApi {
offloads.foreach(
offload =>
injector.injectRasRule(
- c => RasOffload.Rule(offload, validatorBuilder(c.glutenConf),
rewrites)))
+ c => RasOffload.Rule(offload, validatorBuilder(new
GlutenConfig(c.sqlConf)), rewrites)))
// Gluten RAS: Post rules.
injector.injectPostTransform(_ =>
AppendBatchResizeForShuffleInputAndOutput())
@@ -199,13 +204,15 @@ object VeloxRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPostTransform(c => each(c.session)))
- injector.injectPostTransform(c =>
ColumnarCollapseTransformStages(c.glutenConf))
+ injector.injectPostTransform(c => ColumnarCollapseTransformStages(new
GlutenConfig(c.sqlConf)))
injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
injector.injectPostTransform(c =>
RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectPostTransform(
c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatchType)))
- injector.injectPostTransform(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
- injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf,
c.session))
+ injector.injectPostTransform(
+ c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf),
c.session))
+ injector.injectPostTransform(
+ c => GlutenFallbackReporter(new GlutenConfig(c.sqlConf), c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 4bd6012471..33b7f140fb 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig, VeloxConfig}
import org.apache.gluten.expression.VeloxDummyExpression
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -2012,11 +2012,11 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
test("test 'spark.gluten.enabled'") {
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "true") {
+ withSQLConf(GlutenCoreConfig.GLUTEN_ENABLED.key -> "true") {
runQueryAndCompare("select * from lineitem limit 1") {
checkGlutenOperatorMatch[FileSourceScanExecTransformer]
}
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
+ withSQLConf(GlutenCoreConfig.GLUTEN_ENABLED.key -> "false") {
runQueryAndCompare("select * from lineitem limit 1") {
checkSparkOperatorMatch[FileSourceScanExec]
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 288acabe2b..d4d36c6560 100644
---
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.memory;
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.memory.memtarget.{Spillers, TreeMemoryTarget}
import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
@@ -36,7 +36,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
val conf = SQLConf.get
conf.setConfString("spark.memory.offHeap.enabled", "true")
conf.setConfString("spark.memory.offHeap.size", "400")
-
conf.setConfString(GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
"100")
+
conf.setConfString(GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
"100")
}
test("Sanity") {
diff --git a/gluten-arrow/pom.xml b/gluten-arrow/pom.xml
index cb88e4fbc8..c6e036ca5c 100644
--- a/gluten-arrow/pom.xml
+++ b/gluten-arrow/pom.xml
@@ -47,7 +47,19 @@
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
- <artifactId>gluten-core</artifactId>
+ <artifactId>gluten-substrait</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>${sparkshim.artifactId}</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>spark-sql-columnar-shims-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml
index 172ed5930e..ba0e8fb99b 100644
--- a/gluten-core/pom.xml
+++ b/gluten-core/pom.xml
@@ -19,23 +19,6 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>org.apache.gluten</groupId>
- <artifactId>${sparkshim.artifactId}</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.gluten</groupId>
- <artifactId>spark-sql-columnar-shims-common</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.gluten</groupId>
- <artifactId>gluten-ui</artifactId>
- <version>${project.version}</version>
- </dependency>
<!-- Prevent our dummy JAR from being included in Spark distributions or
uploaded to YARN -->
<dependency>
<groupId>org.apache.spark</groupId>
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index 6b981903f4..c9e9aed7b9 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.memory.memtarget;
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
import org.apache.gluten.proto.MemoryUsageStats;
@@ -40,7 +40,7 @@ public class DynamicOffHeapSizingMemoryTarget implements
MemoryTarget, KnownName
static {
final long maxOnHeapSize = Runtime.getRuntime().maxMemory();
- final double fractionForSizing =
GlutenConfig.get().dynamicOffHeapSizingMemoryFraction();
+ final double fractionForSizing =
GlutenCoreConfig.get().dynamicOffHeapSizingMemoryFraction();
// Since when dynamic off-heap sizing is enabled, we commingle on-heap
// and off-heap memory, we set the off-heap size to the usable on-heap
size. We will
// size it with a memory fraction, which can be aggressively set, but the
default
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index ff2a4da03e..135a82fa58 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.memory.memtarget;
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
import org.apache.gluten.memory.MemoryUsageStatsBuilder;
import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
@@ -50,7 +50,7 @@ public final class MemoryTargets {
@Experimental
public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget
memoryTarget) {
- if (GlutenConfig.get().dynamicOffHeapSizingEnabled()) {
+ if (GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()) {
return new DynamicOffHeapSizingMemoryTarget();
}
@@ -63,7 +63,7 @@ public final class MemoryTargets {
Spiller spiller,
Map<String, MemoryUsageStatsBuilder> virtualChildren) {
final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.factory(tmm);
- if (GlutenConfig.get().memoryIsolation()) {
+ if (GlutenCoreConfig.get().memoryIsolation()) {
return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller,
virtualChildren);
}
final TreeMemoryTarget root = factory.legacyRoot();
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index af5a3ff0ce..b03e23a898 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.memory.memtarget;
-import org.apache.gluten.config.GlutenConfig$;
+import org.apache.gluten.config.GlutenCoreConfig$;
import org.apache.spark.memory.SparkMemoryUtil;
import org.apache.spark.sql.internal.SQLConf;
@@ -62,44 +62,47 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget
{
.append(
String.format(
"\t%s=%s",
- GlutenConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
+
GlutenCoreConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
reformatBytes(
SQLConf.get()
.getConfString(
-
GlutenConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key()))))
+
GlutenCoreConfig$.MODULE$.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key()))))
.append(System.lineSeparator())
.append(
String.format(
"\t%s=%s",
-
GlutenConfig$.MODULE$.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
+
GlutenCoreConfig$.MODULE$.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
reformatBytes(
SQLConf.get()
.getConfString(
-
GlutenConfig$.MODULE$.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key()))))
+ GlutenCoreConfig$.MODULE$
+ .COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES()
+ .key()))))
.append(System.lineSeparator())
.append(
String.format(
"\t%s=%s",
-
GlutenConfig$.MODULE$.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
+
GlutenCoreConfig$.MODULE$.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
reformatBytes(
SQLConf.get()
.getConfString(
- GlutenConfig$.MODULE$
+ GlutenCoreConfig$.MODULE$
.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES()
.key()))))
.append(System.lineSeparator())
.append(
String.format(
"\t%s=%s",
- GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(),
-
SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED())))
+ GlutenCoreConfig$.MODULE$.SPARK_OFFHEAP_ENABLED_KEY(),
+
SQLConf.get().getConfString(GlutenCoreConfig$.MODULE$.SPARK_OFFHEAP_ENABLED_KEY())))
.append(System.lineSeparator())
.append(
String.format(
"\t%s=%s",
- GlutenConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key(),
+
GlutenCoreConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key(),
SQLConf.get()
-
.getConfString(GlutenConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key())))
+ .getConfString(
+
GlutenCoreConfig$.MODULE$.DYNAMIC_OFFHEAP_SIZING_ENABLED().key())))
.append(System.lineSeparator());
// Dump all consumer usages to exception body
errorBuilder.append(SparkMemoryUtil.dumpMemoryTargetStats(target));
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
index f158ded4c5..a1f2106e3c 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.memory.memtarget.spark;
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
import org.apache.gluten.memory.memtarget.Spillers;
import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
@@ -77,7 +77,7 @@ public final class TreeMemoryConsumers {
* <p>See <a
href="https://github.com/oap-project/gluten/issues/3030">GLUTEN-3030</a>
*/
public TreeMemoryTarget isolatedRoot() {
- return
ofCapacity(GlutenConfig.get().conservativeTaskOffHeapMemorySize());
+ return
ofCapacity(GlutenCoreConfig.get().conservativeTaskOffHeapMemorySize());
}
}
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 0208d6d0db..19bddf0fd7 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -16,11 +16,8 @@
*/
package org.apache.gluten
-import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.component.Component
-import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.config.GlutenConfig._
-import org.apache.gluten.events.GlutenBuildInfoEvent
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.gluten.initializer.CodedInputStreamClassInitializer
@@ -30,11 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext,
TaskFailedReason}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.softaffinity.SoftAffinityListener
-import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
-import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener,
GlutenUIUtils}
import org.apache.spark.sql.internal.SparkConfigUtil._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
import org.apache.spark.task.TaskResources
import org.apache.spark.util.SparkResourceUtil
@@ -55,92 +48,44 @@ class GlutenPlugin extends SparkPlugin {
}
private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
- private var _sc: Option[SparkContext] = None
+ import GlutenDriverPlugin._
override def init(sc: SparkContext, pluginContext: PluginContext):
util.Map[String, String] = {
- _sc = Some(sc)
val conf = pluginContext.conf()
-
- // Register Gluten listeners
- GlutenSQLAppStatusListener.register(sc)
- if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
- SoftAffinityListener.register(sc)
+ // Spark SQL extensions
+ val extensionSeq = conf.get(SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty)
+ if
(!extensionSeq.toSet.contains(GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME))
{
+ conf.set(
+ SPARK_SESSION_EXTENSIONS,
+ extensionSeq :+ GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME)
}
- postBuildInfoEvent(sc)
-
setPredefinedConfigs(conf)
- // Initialize Backend.
- Component.sorted().foreach(_.onDriverStart(sc, pluginContext))
-
+ val components = Component.sorted()
+ printComponentInfo(components)
+ components.foreach(_.onDriverStart(sc, pluginContext))
Collections.emptyMap()
}
override def registerMetrics(appId: String, pluginContext: PluginContext):
Unit = {
- _sc.foreach {
- sc =>
- if (GlutenUIUtils.uiEnabled(sc)) {
- GlutenUIUtils.attachUI(sc)
- logInfo("Gluten SQL Tab has been attached.")
- }
- }
+ Component.sorted().foreach(_.registerMetrics(appId, pluginContext))
}
override def shutdown(): Unit = {
Component.sorted().reverse.foreach(_.onDriverShutdown())
}
+}
- private def postBuildInfoEvent(sc: SparkContext): Unit = {
- // export gluten version to property to spark
- System.setProperty("gluten.version", VERSION)
-
- val glutenBuildInfo = new mutable.LinkedHashMap[String, String]()
-
- val components = Component.sorted()
- glutenBuildInfo.put("Components",
components.map(_.buildInfo().name).mkString(", "))
- components.foreach {
- comp =>
- val buildInfo = comp.buildInfo()
- glutenBuildInfo.put(s"Component ${buildInfo.name} Branch",
buildInfo.branch)
- glutenBuildInfo.put(s"Component ${buildInfo.name} Revision",
buildInfo.revision)
- glutenBuildInfo.put(s"Component ${buildInfo.name} Revision Time",
buildInfo.revisionTime)
- }
-
- glutenBuildInfo.put("Gluten Version", VERSION)
- glutenBuildInfo.put("GCC Version", GCC_VERSION)
- glutenBuildInfo.put("Java Version", JAVA_COMPILE_VERSION)
- glutenBuildInfo.put("Scala Version", SCALA_COMPILE_VERSION)
- glutenBuildInfo.put("Spark Version", SPARK_COMPILE_VERSION)
- glutenBuildInfo.put("Hadoop Version", HADOOP_COMPILE_VERSION)
- glutenBuildInfo.put("Gluten Branch", BRANCH)
- glutenBuildInfo.put("Gluten Revision", REVISION)
- glutenBuildInfo.put("Gluten Revision Time", REVISION_TIME)
- glutenBuildInfo.put("Gluten Build Time", BUILD_DATE)
- glutenBuildInfo.put("Gluten Repo URL", REPO_URL)
-
- val loggingInfo = glutenBuildInfo
- .map { case (name, value) => s"$name: $value" }
- .mkString(
- "Gluten build
info:\n==============================================================\n",
- "\n",
- "\n=============================================================="
- )
- logInfo(loggingInfo)
- if (GlutenUIUtils.uiEnabled(sc)) {
- val event = GlutenBuildInfoEvent(glutenBuildInfo.toMap)
- GlutenUIUtils.postEvent(sc, event)
- }
- }
-
+private object GlutenDriverPlugin extends Logging {
private def checkOffHeapSettings(conf: SparkConf): Unit = {
- if (conf.get(DYNAMIC_OFFHEAP_SIZING_ENABLED)) {
+ if (conf.get(GlutenCoreConfig.DYNAMIC_OFFHEAP_SIZING_ENABLED)) {
// When dynamic off-heap sizing is enabled, off-heap mode is not
strictly required to be
// enabled. Skip the check.
return
}
- if (conf.get(COLUMNAR_MEMORY_UNTRACKED)) {
+ if (conf.get(GlutenCoreConfig.COLUMNAR_MEMORY_UNTRACKED)) {
// When untracked memory mode is enabled, off-heap mode is not strictly
required to be
// enabled. Skip the check.
return
@@ -148,66 +93,60 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
val minOffHeapSize = "1MB"
if (
- !conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) ||
- conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) <
JavaUtils.byteStringAsBytes(
+ !conf.getBoolean(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY,
defaultValue = false) ||
+ conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY, 0) <
JavaUtils.byteStringAsBytes(
minOffHeapSize)
) {
throw new GlutenException(
- s"Must set '$SPARK_OFFHEAP_ENABLED' to true " +
- s"and set '$SPARK_OFFHEAP_SIZE_KEY' to be greater than
$minOffHeapSize")
+ s"Must set '${GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY}' to true " +
+ s"and set '${GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater
" +
+ s"than $minOffHeapSize")
}
}
private def setPredefinedConfigs(conf: SparkConf): Unit = {
- // Spark SQL extensions
- val extensionSeq = conf.get(SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty)
- if
(!extensionSeq.toSet.contains(GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME))
{
- conf.set(
- SPARK_SESSION_EXTENSIONS,
- extensionSeq :+ GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME)
- }
-
- // adaptive custom cost evaluator class
- val enableGlutenCostEvaluator =
conf.get(GlutenConfig.COST_EVALUATOR_ENABLED)
- if (enableGlutenCostEvaluator) {
- conf.set(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS,
classOf[GlutenCostEvaluator].getName)
- }
-
// check memory off-heap enabled and size.
checkOffHeapSettings(conf)
// Get the off-heap size set by user.
- val offHeapSize = conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY)
+ val offHeapSize =
conf.getSizeAsBytes(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY)
// Set off-heap size in bytes.
- conf.set(COLUMNAR_OFFHEAP_SIZE_IN_BYTES, offHeapSize)
+ conf.set(GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES, offHeapSize)
// Set off-heap size in bytes per task.
val taskSlots = SparkResourceUtil.getTaskSlots(conf)
- conf.set(NUM_TASK_SLOTS_PER_EXECUTOR, taskSlots)
+ conf.set(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR, taskSlots)
val offHeapPerTask = offHeapSize / taskSlots
- conf.set(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES, offHeapPerTask)
+ conf.set(GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES,
offHeapPerTask)
// Pessimistic off-heap sizes, with the assumption that all non-borrowable
storage memory
// determined by spark.memory.storageFraction was used.
val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong /
taskSlots
- conf.set(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES,
conservativeOffHeapPerTask)
-
- // Disable vanilla columnar readers, to prevent columnar-to-columnar
conversions.
- // FIXME: Do we still need this trick since
- // https://github.com/apache/incubator-gluten/pull/1931 was merged?
- if (!conf.get(VANILLA_VECTORIZED_READERS_ENABLED)) {
- // FIXME Hongze 22/12/06
- // BatchScan.scala in shim was not always loaded by class loader.
- // The file should be removed and the "ClassCastException" issue caused
by
- // spark.sql.<format>.enableVectorizedReader=true should be fixed in
another way.
- // Before the issue is fixed we force the use of vanilla row reader by
using
- // the following statement.
- conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, false)
- conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED, false)
- conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED, false)
+ conf.set(
+ GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES,
+ conservativeOffHeapPerTask)
+ }
+
+ private def printComponentInfo(components: Seq[Component]): Unit = {
+ val componentInfo = mutable.LinkedHashMap[String, String]()
+ componentInfo.put("Components",
components.map(_.buildInfo().name).mkString(", "))
+ components.foreach {
+ comp =>
+ val buildInfo = comp.buildInfo()
+ componentInfo.put(s"Component ${buildInfo.name} Branch",
buildInfo.branch)
+ componentInfo.put(s"Component ${buildInfo.name} Revision",
buildInfo.revision)
+ componentInfo.put(s"Component ${buildInfo.name} Revision Time",
buildInfo.revisionTime)
}
+ val loggingInfo = componentInfo
+ .map { case (name, value) => s"$name: $value" }
+ .mkString(
+ "Gluten
components:\n==============================================================\n",
+ "\n",
+ "\n=============================================================="
+ )
+ logInfo(loggingInfo)
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala
b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala
index 642ada4fcd..fdd76870f7 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala
@@ -61,6 +61,9 @@ trait Component {
def onExecutorStart(pc: PluginContext): Unit = {}
def onExecutorShutdown(): Unit = {}
+ /** Metrics register, only called on Driver. */
+ def registerMetrics(appId: String, pluginContext: PluginContext): Unit = {}
+
/**
* Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
* determine the convention (its row-based processing / columnar-batch
processing support) of a
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala
b/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala
new file mode 100644
index 0000000000..f3912007cd
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.sql.internal.{SQLConf, SQLConfProvider}
+
+class GlutenCoreConfig(conf: SQLConf) extends Logging {
+ import GlutenCoreConfig._
+
+ private lazy val configProvider = new SQLConfProvider(conf)
+
+ def getConf[T](entry: ConfigEntry[T]): T = {
+ require(ConfigEntry.containsEntry(entry), s"$entry is not registered")
+ entry.readFrom(configProvider)
+ }
+
+ def enableGluten: Boolean = getConf(GLUTEN_ENABLED)
+
+ def enableRas: Boolean = getConf(RAS_ENABLED)
+
+ def rasCostModel: String = getConf(RAS_COST_MODEL)
+
+ def memoryUntracked: Boolean = getConf(COLUMNAR_MEMORY_UNTRACKED)
+
+ def offHeapMemorySize: Long = getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
+
+ def taskOffHeapMemorySize: Long =
getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
+
+ def conservativeTaskOffHeapMemorySize: Long =
+ getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
+
+ def memoryIsolation: Boolean = getConf(COLUMNAR_MEMORY_ISOLATION)
+
+ def memoryOverAcquiredRatio: Double =
getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
+
+ def memoryReservationBlockSize: Long =
getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
+
+ def dynamicOffHeapSizingEnabled: Boolean =
+ getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
+
+ def dynamicOffHeapSizingMemoryFraction: Double =
+ getConf(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION)
+}
+
+object GlutenCoreConfig {
+ def buildConf(key: String): ConfigBuilder = ConfigBuilder(key)
+
+ def buildStaticConf(key: String): ConfigBuilder = {
+ ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key))
+ }
+
+ def get: GlutenCoreConfig = {
+ new GlutenCoreConfig(SQLConf.get)
+ }
+
+ val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
+ val SPARK_OFFHEAP_ENABLED_KEY = "spark.memory.offHeap.enabled"
+
+ val GLUTEN_ENABLED =
+ buildConf("spark.gluten.enabled")
+ .internal()
+ .doc("Whether to enable gluten. Default value is true. Just an
experimental property." +
+ " Recommend to enable/disable Gluten through the setting for
spark.plugins.")
+ .booleanConf
+ .createWithDefault(true)
+
+ // Options used by RAS.
+ val RAS_ENABLED =
+ buildConf("spark.gluten.ras.enabled")
+ .doc(
+ "Enables RAS (relational algebra selector) during physical " +
+ "planning to generate more efficient query plan. Note, this feature
doesn't bring " +
+ "performance profits by default. Try exploring option
`spark.gluten.ras.costModel` " +
+ "for advanced usage.")
+ .booleanConf
+ .createWithDefault(false)
+
+ // FIXME: This option is no longer only used by RAS. Should change key to
+ // `spark.gluten.costModel` or something similar.
+ val RAS_COST_MODEL =
+ buildConf("spark.gluten.ras.costModel")
+ .doc(
+ "The class name of user-defined cost model that will be used by
Gluten's transition " +
+ "planner as well as by RAS. If not specified, a legacy built-in cost
model will be " +
+ "used. The legacy cost model helps RAS planner exhaustively offload
computations, and " +
+ "helps transition planner choose columnar-to-columnar transition
over others.")
+ .stringConf
+ .createWithDefaultString("legacy")
+
+ val COLUMNAR_MEMORY_UNTRACKED =
+ buildStaticConf("spark.gluten.memory.untracked")
+ .internal()
+ .doc(
+ "When enabled, turn all native memory allocations in Gluten into
untracked. Spark " +
+ "will be unaware of the allocations so will not trigger
spill-to-disk operations " +
+ "or Spark OOMs. Should only be used for testing or other
non-production use cases.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COLUMNAR_MEMORY_ISOLATION =
+ buildConf("spark.gluten.memory.isolation")
+ .internal()
+ .doc("Enable isolated memory mode. If true, Gluten controls the maximum
off-heap memory " +
+ "can be used by each task to X, X = executor memory / max task slots.
It's recommended " +
+ "to set true if Gluten serves concurrent queries within a single
session, since not all " +
+ "memory Gluten allocated is guaranteed to be spillable. In the case,
the feature should " +
+ "be enabled to avoid OOM.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
+ buildConf("spark.gluten.memoryOverhead.size.in.bytes")
+ .internal()
+ .doc(
+ "Must provide default value since non-execution operations " +
+ "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
+ "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("0")
+
+ val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
+ buildConf("spark.gluten.memory.offHeap.size.in.bytes")
+ .internal()
+ .doc(
+ "Must provide default value since non-execution operations " +
+ "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
+ "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("0")
+
+ val COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES =
+ buildConf("spark.gluten.memory.task.offHeap.size.in.bytes")
+ .internal()
+ .doc(
+ "Must provide default value since non-execution operations " +
+ "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
+ "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("0")
+
+ val COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES =
+ buildConf("spark.gluten.memory.conservative.task.offHeap.size.in.bytes")
+ .internal()
+ .doc(
+ "Must provide default value since non-execution operations " +
+ "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
+ "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("0")
+
+ val COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO =
+ buildConf("spark.gluten.memory.overAcquiredMemoryRatio")
+ .internal()
+ .doc("If larger than 0, Velox backend will try over-acquire this ratio
of the total " +
+ "allocated memory as backup to avoid OOM.")
+ .doubleConf
+ .checkValue(d => d >= 0.0d, "Over-acquired ratio should be larger than
or equals 0")
+ .createWithDefault(0.3d)
+
+ val COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE =
+ buildConf("spark.gluten.memory.reservationBlockSize")
+ .internal()
+ .doc("Block size of native reservation listener reserve memory from
Spark.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("8MB")
+
+ val NUM_TASK_SLOTS_PER_EXECUTOR =
+ buildConf("spark.gluten.numTaskSlotsPerExecutor")
+ .internal()
+ .doc(
+ "Must provide default value since non-execution operations " +
+ "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
+ "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+ .intConf
+ .createWithDefaultString("-1")
+
+ // Since https://github.com/apache/incubator-gluten/issues/5439.
+ val DYNAMIC_OFFHEAP_SIZING_ENABLED =
+ buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
+ .internal()
+ .doc(
+ "Experimental: When set to true, the offheap config
(spark.memory.offHeap.size) will " +
+ "be ignored and instead we will consider onheap and offheap memory
in combination, " +
+ "both counting towards the executor memory config
(spark.executor.memory). We will " +
+ "make use of JVM APIs to determine how much onheap memory is use,
alongside tracking " +
+ "offheap allocations made by Gluten. We will then proceed to
enforcing a total memory " +
+ "quota, calculated by the sum of what memory is committed and in use
in the Java " +
+ "heap. Since the calculation of the total quota happens as offheap
allocation happens " +
+ "and not as JVM heap memory is allocated, it is possible that we can
oversubscribe " +
+ "memory. Additionally, note that this change is experimental and may
have performance " +
+ "implications.")
+ .booleanConf
+ .createWithDefault(false)
+
+ // Since https://github.com/apache/incubator-gluten/issues/5439.
+ val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
+
buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
+ .internal()
+ .doc(
+ "Experimental: Determines the memory fraction used to determine the
total " +
+ "memory available for offheap and onheap allocations when the
dynamic offheap " +
+ "sizing feature is enabled. The default is set to match
spark.executor.memoryFraction.")
+ .doubleConf
+ .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must
between [0, 1]")
+ .createWithDefault(0.6)
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index 4e7b2a034c..6097b821e0 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension
import org.apache.gluten.component.Component
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.extension.injector.Injector
import org.apache.spark.internal.Logging
@@ -32,7 +32,9 @@ private[gluten] class GlutenSessionExtensions
injector.control.disableOn {
session =>
val glutenEnabledGlobally = session.conf
- .get(GlutenConfig.GLUTEN_ENABLED.key,
GlutenConfig.GLUTEN_ENABLED.defaultValueString)
+ .get(
+ GlutenCoreConfig.GLUTEN_ENABLED.key,
+ GlutenCoreConfig.GLUTEN_ENABLED.defaultValueString)
.toBoolean
val disabled = !glutenEnabledGlobally
logDebug(s"Gluten is disabled by variable: glutenEnabledGlobally:
$glutenEnabledGlobally")
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index 7257865507..70659f58ce 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.extension.columnar
-import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension.caller.CallerInfo
import org.apache.spark.sql.SparkSession
@@ -31,8 +30,6 @@ object ColumnarRuleApplier {
val session: SparkSession,
val caller: CallerInfo,
val outputsColumnar: Boolean) {
- val glutenConf: GlutenConfig = {
- new GlutenConfig(session.sessionState.conf)
- }
+ val sqlConf = session.sessionState.conf
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
index e10d9bd0b4..d62f27a297 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
@@ -16,49 +16,15 @@
*/
package org.apache.gluten.extension.columnar
-import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.logging.LogLevelUtil
-import org.apache.gluten.metrics.GlutenTimeMetric
-
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
-import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.SparkPlan
class ColumnarRuleExecutor(phase: String, rules: Seq[Rule[SparkPlan]])
extends RuleExecutor[SparkPlan] {
- import ColumnarRuleExecutor._
- private val batch: Batch =
- Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new
LoggedRule(r)): _*)
+ private val batch: Batch = Batch(s"Columnar (Phase [$phase])", Once, rules:
_*)
// TODO: Remove this exclusion then manage to pass Spark's idempotence check.
override protected val excludedOnceBatches: Set[String] = Set(batch.name)
override protected def batches: Seq[Batch] = Seq(batch)
}
-
-object ColumnarRuleExecutor {
- private class LoggedRule(delegate: Rule[SparkPlan])
- extends Rule[SparkPlan]
- with Logging
- with LogLevelUtil {
-
- override val ruleName: String = delegate.ruleName
-
- private def message(oldPlan: SparkPlan, newPlan: SparkPlan, millisTime:
Long): String =
- if (!oldPlan.fastEquals(newPlan)) {
- s"""
- |=== Applying Rule $ruleName took $millisTime ms ===
- |${sideBySide(oldPlan.treeString,
newPlan.treeString).mkString("\n")}
- """.stripMargin
- } else {
- s"Rule $ruleName has no effect, took $millisTime ms."
- }
-
- override def apply(plan: SparkPlan): SparkPlan = {
- val (out, millisTime) =
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
- logOnLevel(GlutenConfig.get.transformPlanLogLevel, message(plan, out,
millisTime))
- out
- }
- }
-}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 2f5c8c4472..762069c373 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -35,13 +35,19 @@ import org.apache.spark.sql.execution.SparkPlan
*/
class EnumeratedApplier(
session: SparkSession,
- ruleBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
+ ruleBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+ ruleWrappers: Seq[Rule[SparkPlan] => Rule[SparkPlan]])
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
+
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, CallerInfo.create(),
outputsColumnar)
- val finalPlan = apply0(ruleBuilders.map(b => b(call)), plan)
+ val finalPlan = apply0(
+ ruleBuilders
+ .map(b => b(call))
+ .map(r => ruleWrappers.foldLeft(r) { case (r, wrapper) => wrapper(r)
}),
+ plan)
finalPlan
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index fed7bca248..ca542f3d15 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -17,12 +17,12 @@
package org.apache.gluten.extension.columnar.enumerated.planner.plan
import org.apache.gluten.ras.PlanModel
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
-import org.apache.spark.task.{SparkTaskUtil, TaskResources}
+import org.apache.spark.task.TaskResources
+import org.apache.spark.util.SparkTaskUtil
import java.util.{Objects, Properties}
@@ -32,7 +32,7 @@ object GlutenPlanModel {
}
private object PlanModelImpl extends PlanModel[SparkPlan] {
- private val fakeTc =
SparkShimLoader.getSparkShims.createTestTaskContext(new Properties())
+ private val fakeTc = SparkTaskUtil.createTestTaskContext(new Properties())
private def fakeTc[T](body: => T): T = {
assert(!TaskResources.inSparkTask())
SparkTaskUtil.setTaskContext(fakeTc)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 493319d423..5fa74f6416 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -35,7 +35,8 @@ class HeuristicApplier(
transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
fallbackPolicyBuilders: Seq[ColumnarRuleCall => SparkPlan =>
Rule[SparkPlan]],
postBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
- finalBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
+ finalBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+ ruleWrappers: Seq[Rule[SparkPlan] => Rule[SparkPlan]])
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
@@ -65,8 +66,13 @@ class HeuristicApplier(
private def transformPlan(
phase: String,
rules: Seq[Rule[SparkPlan]],
- plan: SparkPlan): SparkPlan =
- new ColumnarRuleExecutor(phase, rules).execute(plan)
+ plan: SparkPlan): SparkPlan = {
+ val wrappedRules = ruleWrappers.foldLeft(rules) {
+ case (rules, wrapper) =>
+ rules.map(wrapper)
+ }
+ new ColumnarRuleExecutor(phase, wrappedRules).execute(plan)
+ }
/**
* Rules to let planner create a suggested Gluten plan being sent to
`fallbackPolicies` in which
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index 75ca8a775b..8ab66bf561 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -193,8 +193,7 @@ object Convention {
object KnownRowTypeForSpark33OrLater {
private val lteSpark32: Boolean = {
- val v = SparkVersionUtil.majorMinorVersion()
- SparkVersionUtil.compareMajorMinorVersion(v, (3, 2)) <= 0
+ SparkVersionUtil.compareMajorMinorVersion((3, 2)) <= 0
}
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index 83711e71f6..b28fbe4f36 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -18,12 +18,12 @@ package org.apache.gluten.extension.columnar.transition
import org.apache.gluten.component.Component
import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnionExec}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.util.SparkPlanUtil
/** ConventionFunc is a utility to derive [[Convention]] or [[ConventionReq]]
from a query plan. */
sealed trait ConventionFunc {
@@ -98,7 +98,7 @@ object ConventionFunc {
val out = plan match {
case k: Convention.KnownRowType =>
k.rowType()
- case _ if SparkShimLoader.getSparkShims.supportsRowBased(plan) =>
+ case _ if SparkPlanUtil.supportsRowBased(plan) =>
Convention.RowType.VanillaRowType
case _ =>
Convention.RowType.None
@@ -108,7 +108,7 @@ object ConventionFunc {
}
private def checkRowType(plan: SparkPlan, rowType: Convention.RowType):
Unit = {
- if (SparkShimLoader.getSparkShims.supportsRowBased(plan)) {
+ if (SparkPlanUtil.supportsRowBased(plan)) {
assert(
rowType != Convention.RowType.None,
s"Plan ${plan.nodeName} supports row-based execution, " +
@@ -173,7 +173,7 @@ object ConventionFunc {
ConventionReq.of(
ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatchType)))
- case write: DataWritingCommandExec if
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
+ case write: DataWritingCommandExec if
SparkPlanUtil.isPlannedV1Write(write) =>
// To align with
ApplyColumnarRulesAndInsertTransitions#insertTransitions
Seq(ConventionReq.any)
case u: UnionExec =>
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
index 83289fb973..6a9dfa0623 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.columnar.cost.GlutenCostModel
@@ -94,7 +94,7 @@ object Transition {
private val graphCache = mutable.Map[String, TransitionGraph]()
private def graph(): TransitionGraph = synchronized {
- val aliasOrClass = GlutenConfig.get.rasCostModel
+ val aliasOrClass = GlutenCoreConfig.get.rasCostModel
graphCache.getOrElseUpdate(
aliasOrClass, {
val base = GlutenCostModel.find(aliasOrClass)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index 7a90947527..56f3cc612f 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.SparkVersionUtil
package object transition {
private val gteSpark33: Boolean = {
-
SparkVersionUtil.compareMajorMinorVersion(SparkVersionUtil.majorMinorVersion(),
(3, 3)) >= 0
+ SparkVersionUtil.compareMajorMinorVersion((3, 3)) >= 0
}
type TransitionGraph = FloydWarshallGraph[TransitionGraph.Vertex, Transition]
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index fa8704509e..f6b8957f7e 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.injector
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.extension.GlutenColumnarRule
import org.apache.gluten.extension.columnar.ColumnarRuleApplier
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
@@ -44,7 +44,7 @@ class GlutenInjector private[injector] (control:
InjectorControl) {
}
private def applier(session: SparkSession): ColumnarRuleApplier = {
- val conf = new GlutenConfig(session.sessionState.conf)
+ val conf = new GlutenCoreConfig(session.sessionState.conf)
if (conf.enableRas) {
return ras.createApplier(session)
}
@@ -61,6 +61,7 @@ object GlutenInjector {
mutable.Buffer.empty[ColumnarRuleCall => SparkPlan => Rule[SparkPlan]]
private val postBuilders = mutable.Buffer.empty[ColumnarRuleCall =>
Rule[SparkPlan]]
private val finalBuilders = mutable.Buffer.empty[ColumnarRuleCall =>
Rule[SparkPlan]]
+ private val ruleWrappers = mutable.Buffer.empty[Rule[SparkPlan] =>
Rule[SparkPlan]]
def injectPreTransform(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit
= {
preTransformBuilders += builder
@@ -86,6 +87,10 @@ object GlutenInjector {
finalBuilders += builder
}
+ def injectRuleWrapper(wrapper: Rule[SparkPlan] => Rule[SparkPlan]): Unit =
{
+ ruleWrappers += wrapper
+ }
+
private[injector] def createApplier(session: SparkSession):
ColumnarRuleApplier = {
new HeuristicApplier(
session,
@@ -93,7 +98,8 @@ object GlutenInjector {
c => createHeuristicTransform(c)) ++ postTransformBuilders).toSeq,
fallbackPolicyBuilders.toSeq,
postBuilders.toSeq,
- finalBuilders.toSeq
+ finalBuilders.toSeq,
+ ruleWrappers.toSeq
)
}
@@ -107,6 +113,7 @@ object GlutenInjector {
private val preTransformBuilders = mutable.Buffer.empty[ColumnarRuleCall
=> Rule[SparkPlan]]
private val rasRuleBuilders = mutable.Buffer.empty[ColumnarRuleCall =>
RasRule[SparkPlan]]
private val postTransformBuilders = mutable.Buffer.empty[ColumnarRuleCall
=> Rule[SparkPlan]]
+ private val ruleWrappers = mutable.Buffer.empty[Rule[SparkPlan] =>
Rule[SparkPlan]]
def injectPreTransform(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit
= {
preTransformBuilders += builder
@@ -120,17 +127,22 @@ object GlutenInjector {
postTransformBuilders += builder
}
+ def injectRuleWrapper(wrapper: Rule[SparkPlan] => Rule[SparkPlan]): Unit =
{
+ ruleWrappers += wrapper
+ }
+
private[injector] def createApplier(session: SparkSession):
ColumnarRuleApplier = {
new EnumeratedApplier(
session,
(preTransformBuilders ++ Seq(
- c => createEnumeratedTransform(c)) ++ postTransformBuilders).toSeq)
+ c => createEnumeratedTransform(c)) ++ postTransformBuilders).toSeq,
+ ruleWrappers.toSeq)
}
def createEnumeratedTransform(call: ColumnarRuleCall): EnumeratedTransform
= {
// Build RAS rules.
val rules = rasRuleBuilders.map(_(call))
- val costModel = GlutenCostModel.find(call.glutenConf.rasCostModel)
+ val costModel = GlutenCostModel.find(new
GlutenCoreConfig(call.sqlConf).rasCostModel)
// Create transform.
EnumeratedTransform(costModel, rules.toSeq)
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
index 359aa86dd1..c0988b8eca 100644
---
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.memory
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.memory.memtarget.{MemoryTarget, NoopMemoryTarget}
@@ -30,7 +30,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget,
NoopMemoryTarget}
* BlockId to be extended by user, TestBlockId is chosen for the storage
memory reservations.
*/
object GlobalOffHeapMemory {
- private val target: MemoryTarget = if (GlutenConfig.get.memoryUntracked) {
+ private val target: MemoryTarget = if (GlutenCoreConfig.get.memoryUntracked)
{
new NoopMemoryTarget()
} else {
new GlobalOffHeapMemoryTarget()
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
index 0d006a4848..e4204770f4 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
@@ -29,7 +29,7 @@ import org.apache.spark.util.{SparkVersionUtil, Utils}
*/
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
private val ltSpark33: Boolean = {
-
SparkVersionUtil.compareMajorMinorVersion(SparkVersionUtil.majorMinorVersion(),
(3, 3)) < 0
+ SparkVersionUtil.compareMajorMinorVersion((3, 3)) < 0
}
private val vanillaCostEvaluator: CostEvaluator = {
@@ -46,7 +46,7 @@ case class GlutenCostEvaluator() extends CostEvaluator with
SQLConfHelper {
}
override def evaluateCost(plan: SparkPlan): Cost = {
- if (GlutenConfig.get.enableGluten) {
+ if (GlutenCoreConfig.get.enableGluten) {
new GlutenCost(vanillaCostEvaluator, plan)
} else {
vanillaCostEvaluator.evaluateCost(plan)
diff --git
a/shims/common/src/main/scala/org/apache/spark/storage/BlockManagerUtils.scala
b/gluten-core/src/main/scala/org/apache/spark/storage/BlockManagerUtil.scala
similarity index 98%
rename from
shims/common/src/main/scala/org/apache/spark/storage/BlockManagerUtils.scala
rename to
gluten-core/src/main/scala/org/apache/spark/storage/BlockManagerUtil.scala
index 5a68d15d2a..f2692d566d 100644
---
a/shims/common/src/main/scala/org/apache/spark/storage/BlockManagerUtils.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/storage/BlockManagerUtil.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.io.ChunkedByteBuffer
import scala.reflect.ClassTag
-object BlockManagerUtils {
+object BlockManagerUtil {
def setTestMemoryStore(conf: SparkConf, memoryManager: MemoryManager,
isDriver: Boolean): Unit = {
val store = new MemoryStore(
conf,
diff --git
a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
index 4c46457faf..fb06465c19 100644
--- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
@@ -16,15 +16,14 @@
*/
package org.apache.spark.task
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.memory.SimpleMemoryUsageRecorder
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.task.TaskListener
import org.apache.spark.{TaskContext, TaskFailedReason, TaskKilledException,
UnknownReason}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener}
+import org.apache.spark.util.{SparkTaskUtil, TaskCompletionListener,
TaskFailureListener}
import java.util
import java.util.{Collections, Properties, UUID}
@@ -43,7 +42,7 @@ object TaskResources extends TaskListener with Logging {
val ACCUMULATED_LEAK_BYTES = new AtomicLong(0L)
private def newUnsafeTaskContext(properties: Properties): TaskContext = {
- SparkShimLoader.getSparkShims.createTestTaskContext(properties)
+ SparkTaskUtil.createTestTaskContext(properties)
}
implicit private class PropertiesOps(properties: Properties) {
@@ -65,8 +64,8 @@ object TaskResources extends TaskListener with Logging {
properties.put(key, value)
case _ =>
}
- properties.setIfMissing(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")
- properties.setIfMissing(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, "1TB")
+ properties.setIfMissing(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY, "true")
+ properties.setIfMissing(GlutenCoreConfig.SPARK_OFFHEAP_SIZE_KEY, "1TB")
TaskContext.setTaskContext(newUnsafeTaskContext(properties))
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkPlanUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkPlanUtil.scala
new file mode 100644
index 0000000000..4f5c213e5a
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkPlanUtil.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.internal.SQLConf
+
+object SparkPlanUtil {
+ private val lteSpark32 = SparkVersionUtil.compareMajorMinorVersion((3, 2))
<= 0
+ private val lteSpark33 = SparkVersionUtil.compareMajorMinorVersion((3, 3))
<= 0
+
+ def supportsRowBased(plan: SparkPlan): Boolean = {
+ if (lteSpark32) {
+ return !plan.supportsColumnar
+ }
+
+ val m = classOf[SparkPlan].getMethod("supportsRowBased")
+ m.invoke(plan).asInstanceOf[Boolean]
+ }
+
+ def isPlannedV1Write(plan: DataWritingCommandExec): Boolean = {
+ if (lteSpark33) {
+ return false
+ }
+
+ val v1WriteCommandClass =
+
Utils.classForName("org.apache.spark.sql.execution.datasources.V1WriteCommand")
+ val plannedWriteEnabled =
+ SQLConf.get.getConfString("spark.sql.optimizer.plannedWrite.enabled",
"true").toBoolean
+ v1WriteCommandClass.isAssignableFrom(plan.cmd.getClass) &&
plannedWriteEnabled
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
new file mode 100644
index 0000000000..02df5e224f
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkTaskUtil.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.{SparkConf, TaskContext, TaskContextImpl}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.storage.BlockManagerUtil
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+object SparkTaskUtil {
+ def setTaskContext(taskContext: TaskContext): Unit = {
+ TaskContext.setTaskContext(taskContext)
+ }
+
+ def unsetTaskContext(): Unit = {
+ TaskContext.unset()
+ }
+
+ def getTaskMemoryManager(taskContext: TaskContext): TaskMemoryManager = {
+ taskContext.taskMemoryManager()
+ }
+
+ def createTestTaskContext(properties: Properties): TaskContext = {
+ val conf = new SparkConf()
+ conf.setAll(properties.asScala)
+ val memoryManager = UnifiedMemoryManager(conf, 1)
+ BlockManagerUtil.setTestMemoryStore(conf, memoryManager, isDriver = false)
+ val stageId = -1.asInstanceOf[Object]
+ val stageAttemptNumber = -1.asInstanceOf[Object]
+ val partitionId = -1.asInstanceOf[Object]
+ val taskAttemptId = -1L.asInstanceOf[Object]
+ val attemptNumber = -1.asInstanceOf[Object]
+ val numPartitions = -1.asInstanceOf[Object] // Added in Spark 3.4.
+ val taskMemoryManager = new TaskMemoryManager(memoryManager,
-1L).asInstanceOf[Object]
+ val localProperties = properties.asInstanceOf[Object]
+ val metricsSystem =
+ MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE",
conf).asInstanceOf[Object]
+ val taskMetrics = TaskMetrics.empty.asInstanceOf[Object]
+ val cpus = 1.asInstanceOf[Object] // Added in Spark 3.3.
+ val resources = Map.empty.asInstanceOf[Object]
+
+ val ctor = {
+ val ctors = classOf[TaskContextImpl].getDeclaredConstructors
+ assert(ctors.size == 1)
+ ctors.head
+ }
+
+ if (SparkVersionUtil.compareMajorMinorVersion((3, 2)) <= 0) {
+ return ctor
+ .newInstance(
+ stageId,
+ stageAttemptNumber,
+ partitionId,
+ taskAttemptId,
+ attemptNumber,
+ taskMemoryManager,
+ localProperties,
+ metricsSystem,
+ taskMetrics,
+ resources
+ )
+ .asInstanceOf[TaskContext]
+ }
+
+ if (SparkVersionUtil.compareMajorMinorVersion((3, 3)) == 0) {
+ return ctor
+ .newInstance(
+ stageId,
+ stageAttemptNumber,
+ partitionId,
+ taskAttemptId,
+ attemptNumber,
+ taskMemoryManager,
+ localProperties,
+ metricsSystem,
+ taskMetrics,
+ cpus,
+ resources
+ )
+ .asInstanceOf[TaskContext]
+ }
+
+ // Since Spark 3.4.
+ ctor
+ .newInstance(
+ stageId,
+ stageAttemptNumber,
+ partitionId,
+ taskAttemptId,
+ attemptNumber,
+ numPartitions,
+ taskMemoryManager,
+ localProperties,
+ metricsSystem,
+ taskMetrics,
+ cpus,
+ resources
+ )
+ .asInstanceOf[TaskContext]
+ }
+}
diff --git
a/shims/common/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
similarity index 68%
rename from
shims/common/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
rename to
gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
index 22917639bf..0313432427 100644
--- a/shims/common/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
@@ -17,18 +17,11 @@
package org.apache.spark.util
object SparkVersionUtil {
- def majorMinorVersion(version: String = org.apache.spark.SPARK_VERSION):
(Int, Int) = {
- VersionUtils.majorMinorVersion(version)
- }
-
- def majorMinorPatchVersion(version: String): Option[(Int, Int, Int)] = {
- VersionUtils.majorMinorPatchVersion(version)
- }
-
// Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one >
other.
- def compareMajorMinorVersion(one: (Int, Int), other: (Int, Int)): Int = {
+ def compareMajorMinorVersion(other: (Int, Int)): Int = {
+ val current =
VersionUtils.majorMinorVersion(org.apache.spark.SPARK_VERSION)
val base = 1000
- assert(one._2 < base && other._2 < base)
- one._1 * base + one._2 - (other._1 * base + other._2)
+ assert(current._2 < base && other._2 < base)
+ current._1 * base + current._2 - (other._1 * base + other._2)
}
}
diff --git
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index a31a1d6d35..b59856eef6 100644
---
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.memory.memtarget.spark;
-import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.config.GlutenCoreConfig;
import org.apache.gluten.memory.memtarget.MemoryTarget;
import org.apache.gluten.memory.memtarget.Spiller;
import org.apache.gluten.memory.memtarget.Spillers;
@@ -42,7 +42,7 @@ public class TreeMemoryConsumerTest {
conf.setConfString("spark.memory.offHeap.enabled", "true");
conf.setConfString("spark.memory.offHeap.size", "400");
conf.setConfString(
- GlutenConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
"100");
+
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
"100");
}
@Test
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala
b/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala
index 026b717621..8e4fd48284 100644
--- a/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala
+++ b/gluten-core/src/test/scala/org/apache/gluten/task/TaskResourceSuite.scala
@@ -19,7 +19,8 @@ package org.apache.gluten.task
import org.apache.spark.memory.{MemoryConsumer, MemoryMode}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.task.{SparkTaskUtil, TaskResource, TaskResources}
+import org.apache.spark.task.{TaskResource, TaskResources}
+import org.apache.spark.util.SparkTaskUtil
import org.scalatest.funsuite.AnyFunSuite
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
index 6747b79ffc..44b0597503 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -40,7 +41,7 @@ object OffloadIcebergScan {
c =>
val offload = Seq(OffloadIcebergScan())
HeuristicTransform.Simple(
- Validators.newValidator(c.glutenConf, offload),
+ Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
offload
)
}
@@ -50,7 +51,7 @@ object OffloadIcebergScan {
c =>
RasOffload.Rule(
RasOffload.from[BatchScanExec](OffloadIcebergScan()),
- Validators.newValidator(c.glutenConf),
+ Validators.newValidator(new GlutenConfig(c.sqlConf)),
Nil)
}
}
diff --git
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
index 262fa82f20..4538e41f37 100644
---
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
+++
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -40,7 +41,7 @@ object OffloadKafkaScan {
c =>
val offload = Seq(OffloadKafkaScan())
HeuristicTransform.Simple(
- Validators.newValidator(c.glutenConf, offload),
+ Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
offload
)
}
@@ -50,7 +51,7 @@ object OffloadKafkaScan {
c =>
RasOffload.Rule(
RasOffload.from[BatchScanExec](OffloadKafkaScan()),
- Validators.newValidator(c.glutenConf),
+ Validators.newValidator(new GlutenConfig(c.sqlConf)),
Nil)
}
}
diff --git a/gluten-substrait/pom.xml b/gluten-substrait/pom.xml
index b91ada96e4..2eecf49407 100644
--- a/gluten-substrait/pom.xml
+++ b/gluten-substrait/pom.xml
@@ -32,6 +32,23 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>${sparkshim.artifactId}</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>spark-sql-columnar-shims-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Prevent our dummy JAR from being included in Spark distributions or
uploaded to YARN -->
<dependency>
<groupId>org.apache.spark</groupId>
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
index 37be117105..e90df4ad06 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
@@ -16,16 +16,50 @@
*/
package org.apache.gluten.backendsapi
+import org.apache.gluten.GlutenBuildInfo
import org.apache.gluten.backend.Backend
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED
+import org.apache.gluten.events.GlutenBuildInfoEvent
+import org.apache.gluten.extension.columnar.LoggedRule
import org.apache.gluten.extension.injector.Injector
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.softaffinity.SoftAffinityListener
+import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
+import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener,
GlutenUIUtils}
+import org.apache.spark.sql.internal.SparkConfigUtil._
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util.Collections
+
+import scala.collection.mutable
+
+trait SubstraitBackend extends Backend with Logging {
+ import SubstraitBackend._
+ private var _sc: Option[SparkContext] = None
-trait SubstraitBackend extends Backend {
final override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit
= {
+ _sc = Some(sc)
+ val conf = pc.conf()
+
+ // Register Gluten listeners
+ GlutenSQLAppStatusListener.register(sc)
+ if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
+ SoftAffinityListener.register(sc)
+ }
+
+ postBuildInfoEvent(sc)
+
+ setPredefinedConfigs(conf)
+
+ Collections.emptyMap()
+
listenerApi().onDriverStart(sc, pc)
}
+
final override def onDriverShutdown(): Unit = {
listenerApi().onDriverShutdown()
}
@@ -36,8 +70,21 @@ trait SubstraitBackend extends Backend {
listenerApi().onExecutorShutdown()
}
final override def injectRules(injector: Injector): Unit = {
+ injector.gluten.legacy.injectRuleWrapper(r => new LoggedRule(r))
+ injector.gluten.ras.injectRuleWrapper(r => new LoggedRule(r))
ruleApi().injectRules(injector)
}
+
+ final override def registerMetrics(appId: String, pluginContext:
PluginContext): Unit = {
+ _sc.foreach {
+ sc =>
+ if (GlutenUIUtils.uiEnabled(sc)) {
+ GlutenUIUtils.attachUI(sc)
+ logInfo("Gluten SQL Tab has been attached.")
+ }
+ }
+ }
+
def iteratorApi(): IteratorApi
def sparkPlanExecApi(): SparkPlanExecApi
def transformerApi(): TransformerApi
@@ -47,3 +94,62 @@ trait SubstraitBackend extends Backend {
def ruleApi(): RuleApi
def settings(): BackendSettingsApi
}
+
+object SubstraitBackend extends Logging {
+
+ /** Since https://github.com/apache/incubator-gluten/pull/2247. */
+ private def postBuildInfoEvent(sc: SparkContext): Unit = {
+ // export gluten version to property to spark
+ System.setProperty("gluten.version", GlutenBuildInfo.VERSION)
+
+ val glutenBuildInfo = new mutable.LinkedHashMap[String, String]()
+
+ glutenBuildInfo.put("Gluten Version", GlutenBuildInfo.VERSION)
+ glutenBuildInfo.put("GCC Version", GlutenBuildInfo.GCC_VERSION)
+ glutenBuildInfo.put("Java Version", GlutenBuildInfo.JAVA_COMPILE_VERSION)
+ glutenBuildInfo.put("Scala Version", GlutenBuildInfo.SCALA_COMPILE_VERSION)
+ glutenBuildInfo.put("Spark Version", GlutenBuildInfo.SPARK_COMPILE_VERSION)
+ glutenBuildInfo.put("Hadoop Version",
GlutenBuildInfo.HADOOP_COMPILE_VERSION)
+ glutenBuildInfo.put("Gluten Branch", GlutenBuildInfo.BRANCH)
+ glutenBuildInfo.put("Gluten Revision", GlutenBuildInfo.REVISION)
+ glutenBuildInfo.put("Gluten Revision Time", GlutenBuildInfo.REVISION_TIME)
+ glutenBuildInfo.put("Gluten Build Time", GlutenBuildInfo.BUILD_DATE)
+ glutenBuildInfo.put("Gluten Repo URL", GlutenBuildInfo.REPO_URL)
+
+ val loggingInfo = glutenBuildInfo
+ .map { case (name, value) => s"$name: $value" }
+ .mkString(
+ "Gluten build
info:\n==============================================================\n",
+ "\n",
+ "\n=============================================================="
+ )
+ logInfo(loggingInfo)
+ if (GlutenUIUtils.uiEnabled(sc)) {
+ val event = GlutenBuildInfoEvent(glutenBuildInfo.toMap)
+ GlutenUIUtils.postEvent(sc, event)
+ }
+ }
+
+ private def setPredefinedConfigs(conf: SparkConf): Unit = {
+ // adaptive custom cost evaluator class
+ val enableGlutenCostEvaluator =
conf.get(GlutenConfig.COST_EVALUATOR_ENABLED)
+ if (enableGlutenCostEvaluator) {
+ conf.set(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS,
classOf[GlutenCostEvaluator].getName)
+ }
+
+ // Disable vanilla columnar readers, to prevent columnar-to-columnar
conversions.
+ // FIXME: Do we still need this trick since
+ // https://github.com/apache/incubator-gluten/pull/1931 was merged?
+ if (!conf.get(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)) {
+ // FIXME Hongze 22/12/06
+ // BatchScan.scala in shim was not always loaded by class loader.
+ // The file should be removed and the "ClassCastException" issue caused
by
+ // spark.sql.<format>.enableVectorizedReader=true should be fixed in
another way.
+ // Before the issue is fixed we force the use of vanilla row reader by
using
+ // the following statement.
+ conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, false)
+ conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED, false)
+ conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED, false)
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
similarity index 86%
rename from
gluten-core/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 194e5da239..15f6ecda94 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -16,9 +16,8 @@
*/
package org.apache.gluten.config
-import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
-import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf,
SQLConfProvider}
+import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
import com.google.common.collect.ImmutableList
import org.apache.hadoop.security.UserGroupInformation
@@ -33,20 +32,11 @@ case class GlutenNumaBindingInfo(
totalCoreRange: Array[String] = null,
numCoresPerExecutor: Int = -1) {}
-class GlutenConfig(conf: SQLConf) extends Logging {
+class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {
import GlutenConfig._
- private lazy val configProvider = new SQLConfProvider(conf)
-
- def getConf[T](entry: ConfigEntry[T]): T = {
- require(ConfigEntry.containsEntry(entry), s"$entry is not registered")
- entry.readFrom(configProvider)
- }
-
def enableAnsiMode: Boolean = conf.ansiEnabled
- def enableGluten: Boolean = getConf(GLUTEN_ENABLED)
-
def glutenUiEnabled: Boolean = getConf(GLUTEN_UI_ENABLED)
// FIXME the option currently controls both JVM and native validation
against a Substrait plan.
@@ -247,26 +237,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
}
}
- def memoryIsolation: Boolean = getConf(COLUMNAR_MEMORY_ISOLATION)
-
- def memoryUntracked: Boolean = getConf(COLUMNAR_MEMORY_UNTRACKED)
-
- def offHeapMemorySize: Long = getConf(COLUMNAR_OFFHEAP_SIZE_IN_BYTES)
-
- def taskOffHeapMemorySize: Long =
getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES)
-
- def memoryOverAcquiredRatio: Double =
getConf(COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO)
-
- def memoryReservationBlockSize: Long =
getConf(COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE)
-
- def conservativeTaskOffHeapMemorySize: Long =
- getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES)
-
- // Options used by RAS.
- def enableRas: Boolean = getConf(RAS_ENABLED)
-
- def rasCostModel: String = getConf(RAS_COST_MODEL)
-
def cartesianProductTransformerEnabled: Boolean =
getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
@@ -342,12 +312,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableCastAvgAggregateFunction: Boolean =
getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
- def dynamicOffHeapSizingEnabled: Boolean =
- getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
-
- def dynamicOffHeapSizingMemoryFraction: Double =
- getConf(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION)
-
def enableHiveFileFormatWriter: Boolean =
getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
def enableCelebornFallback: Boolean = getConf(CELEBORN_FALLBACK_ENABLED)
@@ -441,8 +405,6 @@ object GlutenConfig {
val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory"
val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead"
val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor"
- val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
- val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
"spark.unsafe.sorter.spill.reader.buffer.size"
@@ -471,7 +433,7 @@ object GlutenConfig {
val keys = Set(
DEBUG_ENABLED.key,
BENCHMARK_SAVE_DIR.key,
- COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
+ GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
COLUMNAR_MAX_BATCH_SIZE.key,
SHUFFLE_WRITER_BUFFER_SIZE.key,
SQLConf.LEGACY_SIZE_OF_NULL.key,
@@ -598,8 +560,8 @@ object GlutenConfig {
(
"spark.gluten.sql.columnar.backend.velox.IOThreads",
conf.getOrElse(
- NUM_TASK_SLOTS_PER_EXECUTOR.key,
- NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)),
+ GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key,
+ GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)),
(COLUMNAR_SHUFFLE_CODEC.key, ""),
(COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
(DEBUG_CUDF.key, DEBUG_CUDF.defaultValueString),
@@ -622,11 +584,10 @@ object GlutenConfig {
// datasource config
SPARK_SQL_PARQUET_COMPRESSION_CODEC,
// datasource config end
-
- COLUMNAR_OVERHEAD_SIZE_IN_BYTES.key,
- COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key,
- COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
- SPARK_OFFHEAP_ENABLED,
+ GlutenCoreConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES.key,
+ GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key,
+ GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
+ GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY,
DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SPARK_REDACTION_REGEX,
LEGACY_TIME_PARSER_POLICY.key,
@@ -659,13 +620,11 @@ object GlutenConfig {
nativeConfMap
}
- val GLUTEN_ENABLED =
- buildConf("spark.gluten.enabled")
- .internal()
- .doc("Whether to enable gluten. Default value is true. Just an
experimental property." +
- " Recommend to enable/disable Gluten through the setting for
spark.plugins.")
- .booleanConf
- .createWithDefault(true)
+ val GLUTEN_ENABLED = GlutenCoreConfig.GLUTEN_ENABLED
+
+ val RAS_ENABLED = GlutenCoreConfig.RAS_ENABLED
+
+ val RAS_COST_MODEL = GlutenCoreConfig.RAS_COST_MODEL
val GLUTEN_UI_ENABLED = buildStaticConf("spark.gluten.ui.enabled")
.doc(
@@ -1180,77 +1139,6 @@ object GlutenConfig {
.stringConf
.createOptional
- val NUM_TASK_SLOTS_PER_EXECUTOR =
- buildConf("spark.gluten.numTaskSlotsPerExecutor")
- .internal()
- .doc(
- "Must provide default value since non-execution operations " +
- "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
- "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
- .intConf
- .createWithDefaultString("-1")
-
- val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
- buildConf("spark.gluten.memoryOverhead.size.in.bytes")
- .internal()
- .doc(
- "Must provide default value since non-execution operations " +
- "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
- "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("0")
-
- val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
- buildConf("spark.gluten.memory.offHeap.size.in.bytes")
- .internal()
- .doc(
- "Must provide default value since non-execution operations " +
- "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
- "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("0")
-
- val COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES =
- buildConf("spark.gluten.memory.task.offHeap.size.in.bytes")
- .internal()
- .doc(
- "Must provide default value since non-execution operations " +
- "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
- "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("0")
-
- val COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES =
- buildConf("spark.gluten.memory.conservative.task.offHeap.size.in.bytes")
- .internal()
- .doc(
- "Must provide default value since non-execution operations " +
- "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
- "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("0")
-
- val COLUMNAR_MEMORY_ISOLATION =
- buildConf("spark.gluten.memory.isolation")
- .internal()
- .doc("Enable isolated memory mode. If true, Gluten controls the maximum
off-heap memory " +
- "can be used by each task to X, X = executor memory / max task slots.
It's recommended " +
- "to set true if Gluten serves concurrent queries within a single
session, since not all " +
- "memory Gluten allocated is guaranteed to be spillable. In the case,
the feature should " +
- "be enabled to avoid OOM.")
- .booleanConf
- .createWithDefault(false)
-
- val COLUMNAR_MEMORY_UNTRACKED =
- buildStaticConf("spark.gluten.memory.untracked")
- .internal()
- .doc(
- "When enabled, turn all native memory allocations in Gluten into
untracked. Spark " +
- "will be unaware of the allocations so will not trigger
spill-to-disk operations " +
- "or Spark OOMs. Should only be used for testing or other
non-production use cases.")
- .booleanConf
- .createWithDefault(false)
-
val COLUMNAR_MEMORY_BACKTRACE_ALLOCATION =
buildConf("spark.gluten.memory.backtrace.allocation")
.internal()
@@ -1259,45 +1147,6 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)
- val COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO =
- buildConf("spark.gluten.memory.overAcquiredMemoryRatio")
- .internal()
- .doc("If larger than 0, Velox backend will try over-acquire this ratio
of the total " +
- "allocated memory as backup to avoid OOM.")
- .doubleConf
- .checkValue(d => d >= 0.0d, "Over-acquired ratio should be larger than
or equals 0")
- .createWithDefault(0.3d)
-
- val COLUMNAR_MEMORY_RESERVATION_BLOCK_SIZE =
- buildConf("spark.gluten.memory.reservationBlockSize")
- .internal()
- .doc("Block size of native reservation listener reserve memory from
Spark.")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("8MB")
-
- // Options used by RAS.
- val RAS_ENABLED =
- buildConf("spark.gluten.ras.enabled")
- .doc(
- "Enables RAS (relational algebra selector) during physical " +
- "planning to generate more efficient query plan. Note, this feature
doesn't bring " +
- "performance profits by default. Try exploring option
`spark.gluten.ras.costModel` " +
- "for advanced usage.")
- .booleanConf
- .createWithDefault(false)
-
- // FIXME: This option is no longer only used by RAS. Should change key to
- // `spark.gluten.costModel` or something similar.
- val RAS_COST_MODEL =
- buildConf("spark.gluten.ras.costModel")
- .doc(
- "The class name of user-defined cost model that will be used by
Gluten's transition " +
- "planner as well as by RAS. If not specified, a legacy built-in cost
model will be " +
- "used. The legacy cost model helps RAS planner exhaustively offload
computations, and " +
- "helps transition planner choose columnar-to-columnar transition
over others.")
- .stringConf
- .createWithDefaultString("legacy")
-
val TRANSFORM_PLAN_LOG_LEVEL =
buildConf("spark.gluten.sql.transform.logLevel")
.internal()
@@ -1634,34 +1483,6 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
- val DYNAMIC_OFFHEAP_SIZING_ENABLED =
- buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
- .internal()
- .doc(
- "Experimental: When set to true, the offheap config
(spark.memory.offHeap.size) will " +
- "be ignored and instead we will consider onheap and offheap memory
in combination, " +
- "both counting towards the executor memory config
(spark.executor.memory). We will " +
- "make use of JVM APIs to determine how much onheap memory is use,
alongside tracking " +
- "offheap allocations made by Gluten. We will then proceed to
enforcing a total memory " +
- "quota, calculated by the sum of what memory is committed and in use
in the Java " +
- "heap. Since the calculation of the total quota happens as offheap
allocation happens " +
- "and not as JVM heap memory is allocated, it is possible that we can
oversubscribe " +
- "memory. Additionally, note that this change is experimental and may
have performance " +
- "implications.")
- .booleanConf
- .createWithDefault(false)
-
- val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
-
buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
- .internal()
- .doc(
- "Experimental: Determines the memory fraction used to determine the
total " +
- "memory available for offheap and onheap allocations when the
dynamic offheap " +
- "sizing feature is enabled. The default is set to match
spark.executor.memoryFraction.")
- .doubleConf
- .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must
between [0, 1]")
- .createWithDefault(0.6)
-
val CELEBORN_FALLBACK_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled")
.internal()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/LoggedRule.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/LoggedRule.scala
new file mode 100644
index 0000000000..8aa68c4ad1
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/LoggedRule.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.logging.LogLevelUtil
+import org.apache.gluten.metrics.GlutenTimeMetric
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.sideBySide
+import org.apache.spark.sql.execution.SparkPlan
+
+/** Since https://github.com/apache/incubator-gluten/pull/7606. */
+class LoggedRule(delegate: Rule[SparkPlan]) extends Rule[SparkPlan] with
Logging with LogLevelUtil {
+
+ override val ruleName: String = delegate.ruleName
+
+ private def message(oldPlan: SparkPlan, newPlan: SparkPlan, millisTime:
Long): String =
+ if (!oldPlan.fastEquals(newPlan)) {
+ s"""
+ |=== Applying Rule $ruleName took $millisTime ms ===
+ |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
+ """.stripMargin
+ } else {
+ s"Rule $ruleName has no effect, took $millisTime ms."
+ }
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ val (out, millisTime) =
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
+ logOnLevel(GlutenConfig.get.transformPlanLogLevel, message(plan, out,
millisTime))
+ out
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/ConsistentHashSoftAffinityStrategy.scala
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
diff --git
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
rename to
gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
diff --git
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
rename to
gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinityListener.scala
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
rename to
gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ui/GlutenUIUtils.scala
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
similarity index 100%
rename from
gluten-core/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
rename to
gluten-substrait/src/test/scala/org/apache/gluten/config/SparkConfigUtilSuite.scala
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 0eacdcedd2..c0e3d1a349 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -179,7 +179,8 @@ private object FallbackStrategiesSuite {
c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
_ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
- List(_ => RemoveFallbackTagRule())
+ List(_ => RemoveFallbackTagRule()),
+ Nil
)
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 4ca6ce7b04..6f3894d23b 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -189,7 +189,8 @@ private object FallbackStrategiesSuite {
c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
_ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
- List(_ => RemoveFallbackTagRule())
+ List(_ => RemoveFallbackTagRule()),
+ Nil
)
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 4ca6ce7b04..6f3894d23b 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -189,7 +189,8 @@ private object FallbackStrategiesSuite {
c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
_ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
- List(_ => RemoveFallbackTagRule())
+ List(_ => RemoveFallbackTagRule()),
+ Nil
)
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 31b622e51c..de2436a39e 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -190,7 +190,8 @@ private object FallbackStrategiesSuite {
c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()),
_ => ColumnarCollapseTransformStages(GlutenConfig.get)
),
- List(_ => RemoveFallbackTagRule())
+ List(_ => RemoveFallbackTagRule()),
+ Nil
)
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index c777873758..942c74e41c 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.sql.shims
import org.apache.gluten.GlutenBuildInfo.SPARK_COMPILE_VERSION
import org.apache.gluten.expression.Sig
-import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.TaskInfo
@@ -37,8 +37,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
-import org.apache.spark.sql.execution.{CollectLimitExec, FileSourceScanExec,
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
-import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
@@ -48,13 +47,13 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.spark.util.SparkVersionUtil
+import org.apache.spark.util.SparkShimVersionUtil
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.parquet.schema.MessageType
-import java.util.{Map => JMap, Properties}
+import java.util.{Map => JMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -69,10 +68,10 @@ case class SparkShimDescriptor(major: Int, minor: Int,
patch: Int) {
object SparkShimDescriptor {
def apply(version: String): SparkShimDescriptor = {
- SparkVersionUtil.majorMinorPatchVersion(version) match {
+ SparkShimVersionUtil.sparkMajorMinorPatchVersion(version) match {
case Some((major, minor, patch)) => SparkShimDescriptor(major, minor,
patch)
case None =>
- val (major, minor) = SparkVersionUtil.majorMinorVersion(version)
+ val (major, minor) =
SparkShimVersionUtil.sparkMajorMinorVersion(version)
SparkShimDescriptor(major, minor, 0)
}
}
@@ -175,8 +174,6 @@ trait SparkShims {
def enableNativeWriteFilesByDefault(): Boolean = false
- def createTestTaskContext(properties: Properties): TaskContext
-
def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
// Since Spark 3.4, the `sc.broadcast` has been optimized to use
`sc.broadcastInternal`.
// More details see SPARK-39983.
@@ -258,16 +255,12 @@ trait SparkShims {
def extractExpressionTimestampAddUnit(timestampAdd: Expression):
Option[Seq[String]] =
Option.empty
- def supportsRowBased(plan: SparkPlan): Boolean = !plan.supportsColumnar
-
def withTryEvalMode(expr: Expression): Boolean = false
def withAnsiEvalMode(expr: Expression): Boolean = false
def dateTimestampFormatInReadIsDefaultValue(csvOptions: CSVOptions,
timeZone: String): Boolean
- def isPlannedV1Write(write: DataWritingCommandExec): Boolean = false
-
def createParquetFilters(
conf: SQLConf,
schema: MessageType,
diff --git
a/gluten-core/src/main/scala/org/apache/spark/task/SparkTaskUtil.scala
b/shims/common/src/main/scala/org/apache/spark/util/SparkShimVersionUtil.scala
similarity index 66%
rename from gluten-core/src/main/scala/org/apache/spark/task/SparkTaskUtil.scala
rename to
shims/common/src/main/scala/org/apache/spark/util/SparkShimVersionUtil.scala
index ee6d357d55..ec3baef921 100644
--- a/gluten-core/src/main/scala/org/apache/spark/task/SparkTaskUtil.scala
+++
b/shims/common/src/main/scala/org/apache/spark/util/SparkShimVersionUtil.scala
@@ -14,21 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.task
+package org.apache.spark.util
-import org.apache.spark.TaskContext
-import org.apache.spark.memory.TaskMemoryManager
-
-object SparkTaskUtil {
- def setTaskContext(taskContext: TaskContext): Unit = {
- TaskContext.setTaskContext(taskContext)
- }
-
- def unsetTaskContext(): Unit = {
- TaskContext.unset()
+object SparkShimVersionUtil {
+ def sparkMajorMinorVersion(version: String =
org.apache.spark.SPARK_VERSION): (Int, Int) = {
+ VersionUtils.majorMinorVersion(version)
}
- def getTaskMemoryManager(taskContext: TaskContext): TaskMemoryManager = {
- taskContext.taskMemoryManager()
+ def sparkMajorMinorPatchVersion(version: String): Option[(Int, Int, Int)] = {
+ VersionUtils.majorMinorPatchVersion(version)
}
}
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 1a71371c71..5e690fd09e 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.sql.shims.SparkShims
import org.apache.gluten.utils.ExceptionUtils
-import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext,
TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext}
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -58,7 +58,7 @@ import org.apache.parquet.crypto.ParquetCryptoRuntimeException
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
class Spark32Shims extends SparkShims {
@@ -162,10 +162,6 @@ class Spark32Shims extends SparkShims {
List(session => GlutenFormatFactory.getExtendedColumnarPostRule(session))
}
- override def createTestTaskContext(properties: Properties): TaskContext = {
- TaskContextUtils.createTestTaskContext(properties)
- }
-
def setJobDescriptionOrTagForBroadcastExchange(
sc: SparkContext,
broadcastExchange: BroadcastExchangeLike): Unit = {
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 8ff7717fb9..0000000000
--- a/shims/spark32/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
- def createTestTaskContext(properties: Properties): TaskContext = {
- val conf = new SparkConf()
- conf.setAll(properties.asScala)
- val memoryManager = UnifiedMemoryManager(conf, 1)
- BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
- new TaskContextImpl(
- -1,
- -1,
- -1,
- -1L,
- -1,
- new TaskMemoryManager(memoryManager, -1L),
- properties,
- MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
- TaskMetrics.empty,
- Map.empty
- )
- }
-}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index e8a827fe79..9be098288b 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -35,9 +35,8 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
TimestampFormatter}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
-import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec,
PartitionedFileUtil, SparkPlan}
@@ -61,7 +60,7 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
class Spark33Shims extends SparkShims {
override def getDistribution(
@@ -259,10 +258,6 @@ class Spark33Shims extends SparkShims {
List(session => GlutenFormatFactory.getExtendedColumnarPostRule(session))
}
- override def createTestTaskContext(properties: Properties): TaskContext = {
- TaskContextUtils.createTestTaskContext(properties)
- }
-
def setJobDescriptionOrTagForBroadcastExchange(
sc: SparkContext,
broadcastExchange: BroadcastExchangeLike): Unit = {
@@ -349,8 +344,6 @@ class Spark33Shims extends SparkShims {
}
}
- override def supportsRowBased(plan: SparkPlan): Boolean =
plan.supportsRowBased
-
override def dateTimestampFormatInReadIsDefaultValue(
csvOptions: CSVOptions,
timeZone: String): Boolean = {
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 058467888f..0000000000
--- a/shims/spark33/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
- def createTestTaskContext(properties: Properties): TaskContext = {
- val conf = new SparkConf()
- conf.setAll(properties.asScala)
- val memoryManager = UnifiedMemoryManager(conf, 1)
- BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
- new TaskContextImpl(
- -1,
- -1,
- -1,
- -1L,
- -1,
- new TaskMemoryManager(memoryManager, -1L),
- properties,
- MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
- TaskMetrics.empty,
- 1,
- Map.empty
- )
- }
-}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index b1afb8045c..508e256106 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -43,7 +43,6 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
@@ -63,7 +62,7 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
import scala.reflect.ClassTag
@@ -318,10 +317,6 @@ class Spark34Shims extends SparkShims {
override def enableNativeWriteFilesByDefault(): Boolean = true
- override def createTestTaskContext(properties: Properties): TaskContext = {
- TaskContextUtils.createTestTaskContext(properties)
- }
-
override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T):
Broadcast[T] = {
SparkContextUtils.broadcastInternal(sc, value)
}
@@ -546,8 +541,6 @@ class Spark34Shims extends SparkShims {
}
}
- override def supportsRowBased(plan: SparkPlan): Boolean =
plan.supportsRowBased
-
override def withTryEvalMode(expr: Expression): Boolean = {
expr match {
case a: Add => a.evalMode == EvalMode.TRY
@@ -577,10 +570,6 @@ class Spark34Shims extends SparkShims {
csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
}
- override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
- write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
- }
-
override def createParquetFilters(
conf: SQLConf,
schema: MessageType,
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 267b177920..0000000000
--- a/shims/spark34/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
- def createTestTaskContext(properties: Properties): TaskContext = {
- val conf = new SparkConf()
- conf.setAll(properties.asScala)
- val memoryManager = UnifiedMemoryManager(conf, 1)
- BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
- new TaskContextImpl(
- -1,
- -1,
- -1,
- -1L,
- -1,
- -1,
- new TaskMemoryManager(memoryManager, -1L),
- properties,
- MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
- TaskMetrics.empty,
- 1,
- Map.empty
- )
- }
-}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index db9601235c..7ee55675ba 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -44,7 +44,6 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
@@ -66,7 +65,7 @@ import
org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
-import java.util.{HashMap => JHashMap, Map => JMap, Properties}
+import java.util.{HashMap => JHashMap, Map => JMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -352,10 +351,6 @@ class Spark35Shims extends SparkShims {
override def enableNativeWriteFilesByDefault(): Boolean = true
- override def createTestTaskContext(properties: Properties): TaskContext = {
- TaskContextUtils.createTestTaskContext(properties)
- }
-
override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T):
Broadcast[T] = {
SparkContextUtils.broadcastInternal(sc, value)
}
@@ -579,8 +574,6 @@ class Spark35Shims extends SparkShims {
}
}
- override def supportsRowBased(plan: SparkPlan): Boolean =
plan.supportsRowBased
-
override def withTryEvalMode(expr: Expression): Boolean = {
expr match {
case a: Add => a.evalMode == EvalMode.TRY
@@ -610,10 +603,6 @@ class Spark35Shims extends SparkShims {
csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
}
- override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
- write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
- }
-
override def createParquetFilters(
conf: SQLConf,
schema: MessageType,
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
b/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
deleted file mode 100644
index 267b177920..0000000000
--- a/shims/spark35/src/main/scala/org/apache/spark/TaskContextUtils.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.storage.BlockManagerUtils
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-
-object TaskContextUtils {
- def createTestTaskContext(properties: Properties): TaskContext = {
- val conf = new SparkConf()
- conf.setAll(properties.asScala)
- val memoryManager = UnifiedMemoryManager(conf, 1)
- BlockManagerUtils.setTestMemoryStore(conf, memoryManager, isDriver = false)
- new TaskContextImpl(
- -1,
- -1,
- -1,
- -1L,
- -1,
- -1,
- new TaskMemoryManager(memoryManager, -1L),
- properties,
- MetricsSystem.createMetricsSystem("GLUTEN_UNSAFE", conf),
- TaskMetrics.empty,
- 1,
- Map.empty
- )
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]