This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f0c643d728 [Gluten-7402] Code cleanup for GlutenPlugin (#7403)
f0c643d728 is described below

commit f0c643d7281d7c451c375a9fa87348fd9c86116a
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Oct 9 22:02:28 2024 +0800

    [Gluten-7402] Code cleanup for GlutenPlugin (#7403)
---
 .../scala/org/apache/gluten/GlutenPlugin.scala     | 128 +++++++++------------
 1 file changed, 55 insertions(+), 73 deletions(-)

diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 6902d9f95b..d5f5fd2e21 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten
 
 import org.apache.gluten.GlutenBuildInfo._
-import org.apache.gluten.GlutenConfig.GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY
+import org.apache.gluten.GlutenConfig._
 import org.apache.gluten.backend.Backend
 import org.apache.gluten.events.GlutenBuildInfoEvent
 import org.apache.gluten.exception.GlutenException
@@ -55,27 +55,26 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
 
   override def init(sc: SparkContext, pluginContext: PluginContext): 
util.Map[String, String] = {
     _sc = Some(sc)
+    val conf = pluginContext.conf()
+
+    // Register Gluten listeners
     GlutenSQLAppStatusListener.register(sc)
-    postBuildInfoEvent(sc)
+    if (conf.getBoolean(GLUTEN_SOFT_AFFINITY_ENABLED, 
GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)) {
+      SoftAffinityListener.register(sc)
+    }
 
-    val conf = pluginContext.conf()
+    postBuildInfoEvent(sc)
 
     setPredefinedConfigs(sc, conf)
-    // Initialize Backends API.
+
+    // Initialize Backend.
     Backend.get().onDriverStart(sc, pluginContext)
-    if (
-      sc.getConf.getBoolean(
-        GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
-        GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)
-    ) {
-      SoftAffinityListener.register(sc)
-    }
 
     Collections.emptyMap()
   }
 
   override def registerMetrics(appId: String, pluginContext: PluginContext): 
Unit = {
-    if (pluginContext.conf().getBoolean(GlutenConfig.GLUTEN_UI_ENABLED, true)) 
{
+    if (pluginContext.conf().getBoolean(GLUTEN_UI_ENABLED, true)) {
       _sc.foreach {
         sc =>
           GlutenEventUtils.attachUI(sc)
@@ -113,10 +112,7 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     val infoMap = glutenBuildInfo.toMap
     val loggingInfo = infoMap.toSeq
       .sortBy(_._1)
-      .map {
-        case (name, value) =>
-          s"$name: $value"
-      }
+      .map { case (name, value) => s"$name: $value" }
       .mkString(
         "Gluten build 
info:\n==============================================================\n",
         "\n",
@@ -152,8 +148,8 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
           minOffHeapSize))
     ) {
       throw new GlutenException(
-        s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
-          s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater 
than $minOffHeapSize")
+        s"Must set '$SPARK_OFFHEAP_ENABLED' to true " +
+          s"and set '$SPARK_OFFHEAP_SIZE_KEY' to be greater than 
$minOffHeapSize")
     }
 
     // Session's local time zone must be set. If not explicitly set by user, 
its default
@@ -162,71 +158,59 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
 
     // Task slots.
     val taskSlots = SparkResourceUtil.getTaskSlots(conf)
-    conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, 
taskSlots.toString)
-
-    val onHeapSize: Long =
-      if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
-        conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
-      } else {
-        // 1GB default
-        1024 * 1024 * 1024
-      }
+    conf.set(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)
+
+    val onHeapSize: Long = conf.getSizeAsBytes(SPARK_ONHEAP_SIZE_KEY, 1024 * 
1024 * 1024)
 
     // If dynamic off-heap sizing is enabled, the off-heap size is calculated 
based on the on-heap
     // size. Otherwise, the off-heap size is set to the value specified by the 
user (if any).
     // Note that this means that we will IGNORE the off-heap size specified by 
the user if the
     // dynamic off-heap feature is enabled.
-    val offHeapSize: Long =
-      if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, 
false)) {
-        // Since when dynamic off-heap sizing is enabled, we commingle on-heap
-        // and off-heap memory, we set the off-heap size to the usable on-heap 
size. We will
-        // size it with a memory fraction, which can be aggressively set, but 
the default
-        // is using the same way that Spark sizes on-heap memory:
-        //
-        // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
-        //    (spark.executor.memory - 300MB).
-        //
-        // We will be careful to use the same configuration settings as Spark 
to ensure
-        // that we are sizing the off-heap memory in the same way as Spark 
sizes on-heap memory.
-        // The 300MB value, unfortunately, is hard-coded in Spark code.
-        ((onHeapSize - (300 * 1024 * 1024)) *
-          
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 
0.6d)).toLong
-      } else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
-        // Optimistic off-heap sizes, assuming all storage memory can be 
borrowed into execution
-        // memory pool, regardless of Spark option 
spark.memory.storageFraction.
-        conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
-      } else {
-        // Default Spark Value.
-        0L
-      }
+    val offHeapSize: Long = if 
(conf.getBoolean(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
+      // Since when dynamic off-heap sizing is enabled, we commingle on-heap
+      // and off-heap memory, we set the off-heap size to the usable on-heap 
size. We will
+      // size it with a memory fraction, which can be aggressively set, but 
the default
+      // is using the same way that Spark sizes on-heap memory:
+      //
+      // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
+      //    (spark.executor.memory - 300MB).
+      //
+      // We will be careful to use the same configuration settings as Spark to 
ensure
+      // that we are sizing the off-heap memory in the same way as Spark sizes 
on-heap memory.
+      // The 300MB value, unfortunately, is hard-coded in Spark code.
+      ((onHeapSize - (300 * 1024 * 1024)) *
+        conf.getDouble(GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 
0.6d)).toLong
+    } else {
+      // Optimistic off-heap sizes, assuming all storage memory can be 
borrowed into execution
+      // memory pool, regardless of Spark option spark.memory.storageFraction.
+      conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY, 0L)
+    }
 
-    conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, 
offHeapSize.toString)
-    conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
+    conf.set(GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString)
+    conf.set(SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
 
     val offHeapPerTask = offHeapSize / taskSlots
-    conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, 
offHeapPerTask.toString)
+    conf.set(GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)
 
     // If we are using dynamic off-heap sizing, we should also enable off-heap 
memory
     // officially.
-    if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, 
false)) {
-      conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")
+    if (conf.getBoolean(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
+      conf.set(SPARK_OFFHEAP_ENABLED, "true")
 
       // We already sized the off-heap per task in a conservative manner, so 
we can just
       // use it.
-      conf.set(
-        GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
-        offHeapPerTask.toString)
+      conf.set(GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, 
offHeapPerTask.toString)
     } else {
       // Let's make sure this is set to false explicitly if it is not on as it
       // is looked up when throwing OOF exceptions.
-      conf.set(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false")
+      conf.set(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false")
 
       // Pessimistic off-heap sizes, with the assumption that all 
non-borrowable storage memory
       // determined by spark.memory.storageFraction was used.
       val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 
0.5d)
       val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / 
taskSlots
       conf.set(
-        GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
+        GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
         conservativeOffHeapPerTask.toString)
     }
 
@@ -235,8 +219,8 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     //  https://github.com/apache/incubator-gluten/pull/1931 was merged?
     if (
       !conf.getBoolean(
-        GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key,
-        GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
+        VANILLA_VECTORIZED_READERS_ENABLED.key,
+        VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
     ) {
       // FIXME Hongze 22/12/06
       //  BatchScan.scala in shim was not always loaded by class loader.
@@ -244,19 +228,19 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
       //  spark.sql.<format>.enableVectorizedReader=true should be fixed in 
another way.
       //  Before the issue is fixed we force the use of vanilla row reader by 
using
       //  the following statement.
-      conf.set("spark.sql.parquet.enableVectorizedReader", "false")
-      conf.set("spark.sql.orc.enableVectorizedReader", "false")
-      conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", 
"false")
+      conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false")
+      conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false")
+      conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key, "false")
     }
     // When the Velox cache is enabled, the Velox file handle cache should 
also be enabled.
     // Otherwise, a 'reference id not found' error may occur.
     if (
-      conf.getBoolean(GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key, false) && 
!conf.getBoolean(
-        GlutenConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key,
-        false)
+      conf.getBoolean(COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
+      !conf.getBoolean(COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, false)
     ) {
-      throw new 
IllegalArgumentException(s"${GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key} and 
" +
-        s"${GlutenConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should 
be enabled together.")
+      throw new IllegalArgumentException(
+        s"${COLUMNAR_VELOX_CACHE_ENABLED.key} and " +
+          s"${COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should be enabled 
together.")
     }
   }
 }
@@ -266,9 +250,7 @@ private[gluten] class GlutenExecutorPlugin extends 
ExecutorPlugin {
 
   /** Initialize the executor plugin. */
   override def init(ctx: PluginContext, extraConf: util.Map[String, String]): 
Unit = {
-    val conf = ctx.conf()
-
-    // Initialize Backends API.
+    // Initialize Backend.
     Backend.get().onExecutorStart(ctx)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to