This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f0c643d728 [Gluten-7402] Code cleanup for GlutenPlugin (#7403)
f0c643d728 is described below
commit f0c643d7281d7c451c375a9fa87348fd9c86116a
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Oct 9 22:02:28 2024 +0800
[Gluten-7402] Code cleanup for GlutenPlugin (#7403)
---
.../scala/org/apache/gluten/GlutenPlugin.scala | 128 +++++++++------------
1 file changed, 55 insertions(+), 73 deletions(-)
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 6902d9f95b..d5f5fd2e21 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -17,7 +17,7 @@
package org.apache.gluten
import org.apache.gluten.GlutenBuildInfo._
-import org.apache.gluten.GlutenConfig.GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY
+import org.apache.gluten.GlutenConfig._
import org.apache.gluten.backend.Backend
import org.apache.gluten.events.GlutenBuildInfoEvent
import org.apache.gluten.exception.GlutenException
@@ -55,27 +55,26 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
override def init(sc: SparkContext, pluginContext: PluginContext):
util.Map[String, String] = {
_sc = Some(sc)
+ val conf = pluginContext.conf()
+
+ // Register Gluten listeners
GlutenSQLAppStatusListener.register(sc)
- postBuildInfoEvent(sc)
+ if (conf.getBoolean(GLUTEN_SOFT_AFFINITY_ENABLED,
GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)) {
+ SoftAffinityListener.register(sc)
+ }
- val conf = pluginContext.conf()
+ postBuildInfoEvent(sc)
setPredefinedConfigs(sc, conf)
- // Initialize Backends API.
+
+ // Initialize Backend.
Backend.get().onDriverStart(sc, pluginContext)
- if (
- sc.getConf.getBoolean(
- GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
- GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)
- ) {
- SoftAffinityListener.register(sc)
- }
Collections.emptyMap()
}
override def registerMetrics(appId: String, pluginContext: PluginContext):
Unit = {
- if (pluginContext.conf().getBoolean(GlutenConfig.GLUTEN_UI_ENABLED, true))
{
+ if (pluginContext.conf().getBoolean(GLUTEN_UI_ENABLED, true)) {
_sc.foreach {
sc =>
GlutenEventUtils.attachUI(sc)
@@ -113,10 +112,7 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
val infoMap = glutenBuildInfo.toMap
val loggingInfo = infoMap.toSeq
.sortBy(_._1)
- .map {
- case (name, value) =>
- s"$name: $value"
- }
+ .map { case (name, value) => s"$name: $value" }
.mkString(
"Gluten build
info:\n==============================================================\n",
"\n",
@@ -152,8 +148,8 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
minOffHeapSize))
) {
throw new GlutenException(
- s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
- s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater
than $minOffHeapSize")
+ s"Must set '$SPARK_OFFHEAP_ENABLED' to true " +
+ s"and set '$SPARK_OFFHEAP_SIZE_KEY' to be greater than
$minOffHeapSize")
}
// Session's local time zone must be set. If not explicitly set by user,
its default
@@ -162,71 +158,59 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// Task slots.
val taskSlots = SparkResourceUtil.getTaskSlots(conf)
- conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY,
taskSlots.toString)
-
- val onHeapSize: Long =
- if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
- conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
- } else {
- // 1GB default
- 1024 * 1024 * 1024
- }
+ conf.set(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)
+
+ val onHeapSize: Long = conf.getSizeAsBytes(SPARK_ONHEAP_SIZE_KEY, 1024 *
1024 * 1024)
// If dynamic off-heap sizing is enabled, the off-heap size is calculated
based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the
user (if any).
// Note that this means that we will IGNORE the off-heap size specified by
the user if the
// dynamic off-heap feature is enabled.
- val offHeapSize: Long =
- if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED,
false)) {
- // Since when dynamic off-heap sizing is enabled, we commingle on-heap
- // and off-heap memory, we set the off-heap size to the usable on-heap
size. We will
- // size it with a memory fraction, which can be aggressively set, but
the default
- // is using the same way that Spark sizes on-heap memory:
- //
- // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
- // (spark.executor.memory - 300MB).
- //
- // We will be careful to use the same configuration settings as Spark
to ensure
- // that we are sizing the off-heap memory in the same way as Spark
sizes on-heap memory.
- // The 300MB value, unfortunately, is hard-coded in Spark code.
- ((onHeapSize - (300 * 1024 * 1024)) *
-
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION,
0.6d)).toLong
- } else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
- // Optimistic off-heap sizes, assuming all storage memory can be
borrowed into execution
- // memory pool, regardless of Spark option
spark.memory.storageFraction.
- conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
- } else {
- // Default Spark Value.
- 0L
- }
+ val offHeapSize: Long = if
(conf.getBoolean(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
+ // Since when dynamic off-heap sizing is enabled, we commingle on-heap
+ // and off-heap memory, we set the off-heap size to the usable on-heap
size. We will
+ // size it with a memory fraction, which can be aggressively set, but
the default
+ // is using the same way that Spark sizes on-heap memory:
+ //
+ // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
+ // (spark.executor.memory - 300MB).
+ //
+ // We will be careful to use the same configuration settings as Spark to
ensure
+ // that we are sizing the off-heap memory in the same way as Spark sizes
on-heap memory.
+ // The 300MB value, unfortunately, is hard-coded in Spark code.
+ ((onHeapSize - (300 * 1024 * 1024)) *
+ conf.getDouble(GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION,
0.6d)).toLong
+ } else {
+ // Optimistic off-heap sizes, assuming all storage memory can be
borrowed into execution
+ // memory pool, regardless of Spark option spark.memory.storageFraction.
+ conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY, 0L)
+ }
- conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY,
offHeapSize.toString)
- conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
+ conf.set(GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString)
+ conf.set(SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
val offHeapPerTask = offHeapSize / taskSlots
- conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
offHeapPerTask.toString)
+ conf.set(GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)
// If we are using dynamic off-heap sizing, we should also enable off-heap
memory
// officially.
- if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED,
false)) {
- conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")
+ if (conf.getBoolean(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
+ conf.set(SPARK_OFFHEAP_ENABLED, "true")
// We already sized the off-heap per task in a conservative manner, so
we can just
// use it.
- conf.set(
- GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
- offHeapPerTask.toString)
+ conf.set(GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
offHeapPerTask.toString)
} else {
// Let's make sure this is set to false explicitly if it is not on as it
// is looked up when throwing OOF exceptions.
- conf.set(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false")
+ conf.set(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false")
// Pessimistic off-heap sizes, with the assumption that all
non-borrowable storage memory
// determined by spark.memory.storageFraction was used.
val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction",
0.5d)
val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong /
taskSlots
conf.set(
- GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
+ GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
conservativeOffHeapPerTask.toString)
}
@@ -235,8 +219,8 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// https://github.com/apache/incubator-gluten/pull/1931 was merged?
if (
!conf.getBoolean(
- GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key,
- GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
+ VANILLA_VECTORIZED_READERS_ENABLED.key,
+ VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
) {
// FIXME Hongze 22/12/06
// BatchScan.scala in shim was not always loaded by class loader.
@@ -244,19 +228,19 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// spark.sql.<format>.enableVectorizedReader=true should be fixed in
another way.
// Before the issue is fixed we force the use of vanilla row reader by
using
// the following statement.
- conf.set("spark.sql.parquet.enableVectorizedReader", "false")
- conf.set("spark.sql.orc.enableVectorizedReader", "false")
- conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader",
"false")
+ conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false")
+ conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false")
+ conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key, "false")
}
// When the Velox cache is enabled, the Velox file handle cache should
also be enabled.
// Otherwise, a 'reference id not found' error may occur.
if (
- conf.getBoolean(GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
!conf.getBoolean(
- GlutenConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key,
- false)
+ conf.getBoolean(COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
+ !conf.getBoolean(COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, false)
) {
- throw new
IllegalArgumentException(s"${GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key} and
" +
- s"${GlutenConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should
be enabled together.")
+ throw new IllegalArgumentException(
+ s"${COLUMNAR_VELOX_CACHE_ENABLED.key} and " +
+ s"${COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should be enabled
together.")
}
}
}
@@ -266,9 +250,7 @@ private[gluten] class GlutenExecutorPlugin extends
ExecutorPlugin {
/** Initialize the executor plugin. */
override def init(ctx: PluginContext, extraConf: util.Map[String, String]):
Unit = {
- val conf = ctx.conf()
-
- // Initialize Backends API.
+ // Initialize Backend.
Backend.get().onExecutorStart(ctx)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]