This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7c56d429b Revert "[VL] gluten-it: Shorten table creation and query
runner logs" (#5237)
7c56d429b is described below
commit 7c56d429b93a0d570afbf8d48e4185756d8c6ade
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Apr 2 09:14:13 2024 +0800
Revert "[VL] gluten-it: Shorten table creation and query runner logs"
(#5237)
---
.../apache/gluten/integration/tpc/TpcRunner.scala | 15 +++++-------
.../scala/org/apache/spark/sql/QueryRunner.scala | 28 ++++++++++------------
.../apache/spark/sql/SparkSessionSwitcher.scala | 19 +++++++--------
3 files changed, 26 insertions(+), 36 deletions(-)
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
index c7ab7febd..ab76dc68c 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.integration.tpc
import org.apache.spark.sql.{AnalysisException, QueryRunner, RunResult,
SparkSession}
+
import com.google.common.base.Preconditions
import org.apache.commons.io.FileUtils
@@ -46,14 +47,13 @@ class TpcRunner(val queryResourceFolder: String, val
dataPath: String) {
object TpcRunner {
def createTables(spark: SparkSession, dataPath: String): Unit = {
- print("Creating catalog tables: ")
- try {
- val files = new File(dataPath).listFiles()
- files.foreach(file => {
+ val files = new File(dataPath).listFiles()
+ files.foreach(
+ file => {
if (spark.catalog.tableExists(file.getName)) {
- print(s"${file.getName}(exists), ")
+ println("Table exists: " + file.getName)
} else {
- print(s"${file.getName}, ")
+ println("Creating catalog table: " + file.getName)
spark.catalog.createTable(file.getName, file.getAbsolutePath,
"parquet")
try {
spark.catalog.recoverPartitions(file.getName)
@@ -62,9 +62,6 @@ object TpcRunner {
}
}
})
- } finally {
- println("... Done.")
- }
}
private def delete(path: String): Unit = {
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
index 20c8a0617..a4044c925 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
@@ -18,12 +18,7 @@ package org.apache.spark.sql
import org.apache.spark.{SparkContext, Success, TaskKilled}
import org.apache.spark.executor.ExecutorMetrics
-import org.apache.spark.scheduler.{
- SparkListener,
- SparkListenerExecutorMetricsUpdate,
- SparkListenerTaskEnd,
- SparkListenerTaskStart
-}
+import org.apache.spark.scheduler.{SparkListener,
SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd,
SparkListenerTaskStart}
import org.apache.spark.sql.KillTaskListener.INIT_WAIT_TIME_MS
import com.google.common.base.Preconditions
@@ -50,7 +45,8 @@ object QueryRunner {
"ProcessTreePythonVMemory",
"ProcessTreePythonRSSMemory",
"ProcessTreeOtherVMemory",
- "ProcessTreeOtherRSSMemory")
+ "ProcessTreeOtherRSSMemory"
+ )
def runTpcQuery(
spark: SparkSession,
@@ -80,7 +76,7 @@ object QueryRunner {
}
killTaskListener.foreach(sc.addSparkListener(_))
- print(s"Executing SQL query from resource path $queryPath... ")
+ println(s"Executing SQL query from resource path $queryPath...")
try {
val sql = resourceToString(queryPath)
val prev = System.nanoTime()
@@ -94,13 +90,13 @@ object QueryRunner {
RunResult(rows, millis, collectedMetrics)
} finally {
sc.removeSparkListener(metricsListener)
- killTaskListener.foreach(l => {
- sc.removeSparkListener(l)
- println(s"Successful kill rate ${"%.2f%%"
- .format(100 * l.successfulKillRate())} during execution of app:
${sc.applicationId}")
- })
+ killTaskListener.foreach(
+ l => {
+ sc.removeSparkListener(l)
+ println(s"Successful kill rate ${"%.2f%%".format(
+ 100 * l.successfulKillRate())} during execution of app:
${sc.applicationId}")
+ })
sc.setJobDescription(null)
- println("Done.")
}
}
@@ -160,8 +156,8 @@ class KillTaskListener(val sc: SparkContext) extends
SparkListener {
sync.synchronized {
val total = Math.min(
stageKillMaxWaitTimeLookup.computeIfAbsent(taskStart.stageId,
_ => Long.MaxValue),
- stageKillWaitTimeLookup
- .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS))
+ stageKillWaitTimeLookup.computeIfAbsent(taskStart.stageId, _
=> INIT_WAIT_TIME_MS)
+ )
val elapsed = System.currentTimeMillis() - startMs
val remaining = total - elapsed
if (remaining <= 0L) {
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
index 9d20b931c..17a50fd29 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
@@ -72,17 +72,14 @@ class SparkSessionSwitcher(val masterUrl: String, val
logLevel: String) extends
if (!sessionMap.containsKey(desc.sessionToken)) {
throw new IllegalArgumentException(s"Session doesn't exist: $desc")
}
- print(s"Switching to $desc session... ")
- try {
- stopActiveSession()
- val conf = new SparkConf(false)
- .setAllWarningOnOverriding(testDefaults.getAll)
- .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
- activateSession(conf, desc.appName)
- _activeSessionDesc = desc
- } finally {
- println("Done. ")
- }
+ println(s"Switching to $desc session... ")
+ stopActiveSession()
+ val conf = new SparkConf(false)
+ .setAllWarningOnOverriding(testDefaults.getAll)
+ .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
+ activateSession(conf, desc.appName)
+ _activeSessionDesc = desc
+ println(s"Successfully switched to $desc session. ")
}
def spark(): SparkSession = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]