This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 44c202b956 [VL] Gluten-it: Minor refactor on configuration priorities
(#9489)
44c202b956 is described below
commit 44c202b956964c80d6fa2ffe5c7da8329fd1b59b
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri May 2 10:49:11 2025 +0100
[VL] Gluten-it: Minor refactor on configuration priorities (#9489)
---
.../org/apache/gluten/integration/Suite.scala | 68 ++++++++--------------
.../apache/spark/sql/SparkSessionSwitcher.scala | 23 +++++---
2 files changed, 38 insertions(+), 53 deletions(-)
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index b240855ed8..4879e31db9 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -57,33 +57,19 @@ abstract class Suite(
new SparkSessionSwitcher(masterUrl, logLevel.toString)
// define initial configs
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.sources.useV1SourceList",
"")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.sql.shuffle.partitions",
s"$shufflePartitions")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.storage.blockManagerSlaveTimeoutMs",
"3600000")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.executor.heartbeatInterval", "10s")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.worker.timeout", "3600")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.executor.metrics.pollingInterval", "0")
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.network.timeout",
"3601s")
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.broadcastTimeout",
"1800")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.network.io.preferDirectBufs", "false")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.unsafe.exceptionOnMemoryLeak",
s"$errorOnMemLeak")
+ sessionSwitcher.addDefaultConf("spark.sql.sources.useV1SourceList", "")
+ sessionSwitcher.addDefaultConf("spark.sql.shuffle.partitions",
s"$shufflePartitions")
+ sessionSwitcher.addDefaultConf("spark.storage.blockManagerSlaveTimeoutMs",
"3600000")
+ sessionSwitcher.addDefaultConf("spark.executor.heartbeatInterval", "10s")
+ sessionSwitcher.addDefaultConf("spark.worker.timeout", "3600")
+ sessionSwitcher.addDefaultConf("spark.executor.metrics.pollingInterval", "0")
+ sessionSwitcher.addDefaultConf("spark.network.timeout", "3601s")
+ sessionSwitcher.addDefaultConf("spark.sql.broadcastTimeout", "1800")
+ sessionSwitcher.addDefaultConf("spark.network.io.preferDirectBufs", "false")
+ sessionSwitcher.addDefaultConf("spark.unsafe.exceptionOnMemoryLeak",
s"$errorOnMemLeak")
if (!enableUi) {
- sessionSwitcher.defaultConf().setWarningOnOverriding("spark.ui.enabled",
"false")
+ sessionSwitcher.addDefaultConf("spark.ui.enabled", "false")
}
if (enableHsUi) {
@@ -92,45 +78,37 @@ abstract class Suite(
"Unable to create history directory: " +
historyWritePath())
}
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.eventLog.enabled",
"true")
- sessionSwitcher.defaultConf().setWarningOnOverriding("spark.eventLog.dir",
historyWritePath())
+ sessionSwitcher.addDefaultConf("spark.eventLog.enabled", "true")
+ sessionSwitcher.addDefaultConf("spark.eventLog.dir", historyWritePath())
}
if (disableAqe) {
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.adaptive.enabled",
"false")
+ sessionSwitcher.addDefaultConf("spark.sql.adaptive.enabled", "false")
}
if (disableBhj) {
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.sql.autoBroadcastJoinThreshold", "-1")
+ sessionSwitcher.addDefaultConf("spark.sql.autoBroadcastJoinThreshold",
"-1")
}
if (disableWscg) {
-
sessionSwitcher.defaultConf().setWarningOnOverriding("spark.sql.codegen.wholeStage",
"false")
+ sessionSwitcher.addDefaultConf("spark.sql.codegen.wholeStage", "false")
}
if (scanPartitions != -1) {
// Scan partition number.
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.sql.files.maxPartitionBytes",
s"${ByteUnit.PiB.toBytes(1L)}")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.sql.files.openCostInBytes", "0")
- sessionSwitcher
- .defaultConf()
- .setWarningOnOverriding("spark.sql.files.minPartitionNum",
s"${(scanPartitions - 1).max(1)}")
- }
-
- extraSparkConf.toStream.foreach {
- kv => sessionSwitcher.defaultConf().setWarningOnOverriding(kv._1, kv._2)
+ sessionSwitcher.addDefaultConf("spark.sql.files.maxPartitionBytes",
s"${ByteUnit.PiB.toBytes(1L)}")
+ sessionSwitcher.addDefaultConf("spark.sql.files.openCostInBytes", "0")
+ sessionSwitcher.addDefaultConf("spark.sql.files.minPartitionNum",
s"${(scanPartitions - 1).max(1)}")
}
// register sessions
sessionSwitcher.registerSession("test", testConf)
sessionSwitcher.registerSession("baseline", baselineConf)
+ extraSparkConf.toStream.foreach {
+ kv => sessionSwitcher.addExtraConf(kv._1, kv._2)
+ }
+
private def startHistoryServer(): Int = {
val hsConf = new SparkConf(false)
hsConf.setWarningOnOverriding("spark.history.ui.port", s"$hsUiPort")
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
index f6c8a177a6..e1f14ba7ba 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
@@ -26,9 +26,6 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.hadoop.fs.LocalFileSystem
class SparkSessionSwitcher(val masterUrl: String, val logLevel: String)
extends AutoCloseable {
- private val sessionMap: java.util.Map[SessionToken, SparkConf] =
- new java.util.HashMap[SessionToken, SparkConf]
-
private val testDefaults = new SparkConf(false)
.setWarningOnOverriding("spark.hadoop.fs.file.impl",
classOf[LocalFileSystem].getName)
.setWarningOnOverriding(SQLConf.CODEGEN_FALLBACK.key, "false")
@@ -45,19 +42,28 @@ class SparkSessionSwitcher(val masterUrl: String, val
logLevel: String) extends
StaticSQLConf.WAREHOUSE_PATH.key,
testDefaults.get(StaticSQLConf.WAREHOUSE_PATH) + "/" +
getClass.getCanonicalName)
+ private val sessionMap: java.util.Map[SessionToken, SparkConf] =
+ new java.util.HashMap[SessionToken, SparkConf]
+
+ private val extraConf = new SparkConf(false)
+
private var _spark: SparkSession = _
private var _activeSessionDesc: SessionDesc = SparkSessionSwitcher.NONE
- def defaultConf(): SparkConf = {
- testDefaults
+ def addDefaultConf(key: String, value: String): Unit = {
+ testDefaults.setWarningOnOverriding(key, value)
+ }
+
+ def addExtraConf(key: String, value: String): Unit = {
+ extraConf.setWarningOnOverriding(key, value)
}
- def registerSession(name: String, conf: SparkConf): SessionToken =
synchronized {
+ def registerSession(name: String, sessionConf: SparkConf): SessionToken =
synchronized {
val token = SessionToken(name)
if (sessionMap.containsKey(token)) {
throw new IllegalArgumentException(s"Session name already registered:
$name")
}
- sessionMap.put(token, conf)
+ sessionMap.put(token, new
SparkConf(false).setAllWarningOnOverriding(sessionConf.getAll))
return token
}
@@ -85,8 +91,9 @@ class SparkSessionSwitcher(val masterUrl: String, val
logLevel: String) extends
println(s"Switching to $desc session... ")
stopActiveSession()
val conf = new SparkConf(false)
- .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
.setAllWarningOnOverriding(testDefaults.getAll)
+ .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
+ .setAllWarningOnOverriding(extraConf.getAll)
activateSession(conf, desc.appName)
_activeSessionDesc = desc
println(s"Successfully switched to $desc session. ")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]