This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 44c202b956 [VL] Gluten-it: Minor refactor on configuration priorities 
(#9489)
44c202b956 is described below

commit 44c202b956964c80d6fa2ffe5c7da8329fd1b59b
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri May 2 10:49:11 2025 +0100

    [VL] Gluten-it: Minor refactor on configuration priorities (#9489)
---
 .../org/apache/gluten/integration/Suite.scala      | 68 ++++++++--------------
 .../apache/spark/sql/SparkSessionSwitcher.scala    | 23 +++++---
 2 files changed, 38 insertions(+), 53 deletions(-)

diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index b240855ed8..4879e31db9 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -57,33 +57,19 @@ abstract class Suite(
     new SparkSessionSwitcher(masterUrl, logLevel.toString)
 
   // define initial configs
-  
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.sources.useV1SourceList",
 "")
-  sessionSwitcher
-    .defaultConf()
-    .setWarningOnOverriding("spark.sql.shuffle.partitions", 
s"$shufflePartitions")
-  sessionSwitcher
-    .defaultConf()
-    .setWarningOnOverriding("spark.storage.blockManagerSlaveTimeoutMs", 
"3600000")
-  sessionSwitcher
-    .defaultConf()
-    .setWarningOnOverriding("spark.executor.heartbeatInterval", "10s")
-  sessionSwitcher
-    .defaultConf()
-    .setWarningOnOverriding("spark.worker.timeout", "3600")
-  sessionSwitcher
-    .defaultConf()
-    .setWarningOnOverriding("spark.executor.metrics.pollingInterval", "0")
-  
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.network.timeout", 
"3601s")
-  
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.broadcastTimeout",
 "1800")
-  sessionSwitcher
-    .defaultConf()
-    .setWarningOnOverriding("spark.network.io.preferDirectBufs", "false")
-  sessionSwitcher
-    .defaultConf()
-    .setWarningOnOverriding("spark.unsafe.exceptionOnMemoryLeak", 
s"$errorOnMemLeak")
+  sessionSwitcher.addDefaultConf("spark.sql.sources.useV1SourceList", "")
+  sessionSwitcher.addDefaultConf("spark.sql.shuffle.partitions", 
s"$shufflePartitions")
+  sessionSwitcher.addDefaultConf("spark.storage.blockManagerSlaveTimeoutMs", 
"3600000")
+  sessionSwitcher.addDefaultConf("spark.executor.heartbeatInterval", "10s")
+  sessionSwitcher.addDefaultConf("spark.worker.timeout", "3600")
+  sessionSwitcher.addDefaultConf("spark.executor.metrics.pollingInterval", "0")
+  sessionSwitcher.addDefaultConf("spark.network.timeout", "3601s")
+  sessionSwitcher.addDefaultConf("spark.sql.broadcastTimeout", "1800")
+  sessionSwitcher.addDefaultConf("spark.network.io.preferDirectBufs", "false")
+  sessionSwitcher.addDefaultConf("spark.unsafe.exceptionOnMemoryLeak", 
s"$errorOnMemLeak")
 
   if (!enableUi) {
-    sessionSwitcher.defaultConf().setWarningOnOverriding("spark.ui.enabled", 
"false")
+    sessionSwitcher.addDefaultConf("spark.ui.enabled", "false")
   }
 
   if (enableHsUi) {
@@ -92,45 +78,37 @@ abstract class Suite(
         "Unable to create history directory: " +
           historyWritePath())
     }
-    
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.eventLog.enabled", 
"true")
-    sessionSwitcher.defaultConf().setWarningOnOverriding("spark.eventLog.dir", 
historyWritePath())
+    sessionSwitcher.addDefaultConf("spark.eventLog.enabled", "true")
+    sessionSwitcher.addDefaultConf("spark.eventLog.dir", historyWritePath())
   }
 
   if (disableAqe) {
-    
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.adaptive.enabled",
 "false")
+    sessionSwitcher.addDefaultConf("spark.sql.adaptive.enabled", "false")
   }
 
   if (disableBhj) {
-    sessionSwitcher
-      .defaultConf()
-      .setWarningOnOverriding("spark.sql.autoBroadcastJoinThreshold", "-1")
+    sessionSwitcher.addDefaultConf("spark.sql.autoBroadcastJoinThreshold", 
"-1")
   }
 
   if (disableWscg) {
-    
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.codegen.wholeStage",
 "false")
+    sessionSwitcher.addDefaultConf("spark.sql.codegen.wholeStage", "false")
   }
 
   if (scanPartitions != -1) {
     // Scan partition number.
-    sessionSwitcher
-      .defaultConf()
-      .setWarningOnOverriding("spark.sql.files.maxPartitionBytes", 
s"${ByteUnit.PiB.toBytes(1L)}")
-    sessionSwitcher
-      .defaultConf()
-      .setWarningOnOverriding("spark.sql.files.openCostInBytes", "0")
-    sessionSwitcher
-      .defaultConf()
-      .setWarningOnOverriding("spark.sql.files.minPartitionNum", 
s"${(scanPartitions - 1).max(1)}")
-  }
-
-  extraSparkConf.toStream.foreach {
-    kv => sessionSwitcher.defaultConf().setWarningOnOverriding(kv._1, kv._2)
+    sessionSwitcher.addDefaultConf("spark.sql.files.maxPartitionBytes", 
s"${ByteUnit.PiB.toBytes(1L)}")
+    sessionSwitcher.addDefaultConf("spark.sql.files.openCostInBytes", "0")
+    sessionSwitcher.addDefaultConf("spark.sql.files.minPartitionNum", 
s"${(scanPartitions - 1).max(1)}")
   }
 
   // register sessions
   sessionSwitcher.registerSession("test", testConf)
   sessionSwitcher.registerSession("baseline", baselineConf)
 
+  extraSparkConf.toStream.foreach {
+    kv => sessionSwitcher.addExtraConf(kv._1, kv._2)
+  }
+
   private def startHistoryServer(): Int = {
     val hsConf = new SparkConf(false)
     hsConf.setWarningOnOverriding("spark.history.ui.port", s"$hsUiPort")
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
index f6c8a177a6..e1f14ba7ba 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
@@ -26,9 +26,6 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.hadoop.fs.LocalFileSystem
 
 class SparkSessionSwitcher(val masterUrl: String, val logLevel: String) 
extends AutoCloseable {
-  private val sessionMap: java.util.Map[SessionToken, SparkConf] =
-    new java.util.HashMap[SessionToken, SparkConf]
-
   private val testDefaults = new SparkConf(false)
     .setWarningOnOverriding("spark.hadoop.fs.file.impl", 
classOf[LocalFileSystem].getName)
     .setWarningOnOverriding(SQLConf.CODEGEN_FALLBACK.key, "false")
@@ -45,19 +42,28 @@ class SparkSessionSwitcher(val masterUrl: String, val 
logLevel: String) extends
     StaticSQLConf.WAREHOUSE_PATH.key,
     testDefaults.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + 
getClass.getCanonicalName)
 
+  private val sessionMap: java.util.Map[SessionToken, SparkConf] =
+    new java.util.HashMap[SessionToken, SparkConf]
+
+  private val extraConf = new SparkConf(false)
+
   private var _spark: SparkSession = _
   private var _activeSessionDesc: SessionDesc = SparkSessionSwitcher.NONE
 
-  def defaultConf(): SparkConf = {
-    testDefaults
+  def addDefaultConf(key: String, value: String): Unit = {
+    testDefaults.setWarningOnOverriding(key, value)
+  }
+
+  def addExtraConf(key: String, value: String): Unit = {
+    extraConf.setWarningOnOverriding(key, value)
   }
 
-  def registerSession(name: String, conf: SparkConf): SessionToken = 
synchronized {
+  def registerSession(name: String, sessionConf: SparkConf): SessionToken = 
synchronized {
     val token = SessionToken(name)
     if (sessionMap.containsKey(token)) {
       throw new IllegalArgumentException(s"Session name already registered: 
$name")
     }
-    sessionMap.put(token, conf)
+    sessionMap.put(token, new 
SparkConf(false).setAllWarningOnOverriding(sessionConf.getAll))
     return token
   }
 
@@ -85,8 +91,9 @@ class SparkSessionSwitcher(val masterUrl: String, val 
logLevel: String) extends
     println(s"Switching to $desc session... ")
     stopActiveSession()
     val conf = new SparkConf(false)
-      .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
       .setAllWarningOnOverriding(testDefaults.getAll)
+      .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
+      .setAllWarningOnOverriding(extraConf.getAll)
     activateSession(conf, desc.appName)
     _activeSessionDesc = desc
     println(s"Successfully switched to $desc session. ")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to