This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c56d429b Revert "[VL] gluten-it: Shorten table creation and query 
runner logs" (#5237)
7c56d429b is described below

commit 7c56d429b93a0d570afbf8d48e4185756d8c6ade
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Apr 2 09:14:13 2024 +0800

    Revert "[VL] gluten-it: Shorten table creation and query runner logs" 
(#5237)
---
 .../apache/gluten/integration/tpc/TpcRunner.scala  | 15 +++++-------
 .../scala/org/apache/spark/sql/QueryRunner.scala   | 28 ++++++++++------------
 .../apache/spark/sql/SparkSessionSwitcher.scala    | 19 +++++++--------
 3 files changed, 26 insertions(+), 36 deletions(-)

diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
index c7ab7febd..ab76dc68c 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.integration.tpc
 
 import org.apache.spark.sql.{AnalysisException, QueryRunner, RunResult, 
SparkSession}
+
 import com.google.common.base.Preconditions
 import org.apache.commons.io.FileUtils
 
@@ -46,14 +47,13 @@ class TpcRunner(val queryResourceFolder: String, val 
dataPath: String) {
 
 object TpcRunner {
   def createTables(spark: SparkSession, dataPath: String): Unit = {
-    print("Creating catalog tables: ")
-    try {
-      val files = new File(dataPath).listFiles()
-      files.foreach(file => {
+    val files = new File(dataPath).listFiles()
+    files.foreach(
+      file => {
         if (spark.catalog.tableExists(file.getName)) {
-          print(s"${file.getName}(exists), ")
+          println("Table exists: " + file.getName)
         } else {
-          print(s"${file.getName}, ")
+          println("Creating catalog table: " + file.getName)
           spark.catalog.createTable(file.getName, file.getAbsolutePath, 
"parquet")
           try {
             spark.catalog.recoverPartitions(file.getName)
@@ -62,9 +62,6 @@ object TpcRunner {
           }
         }
       })
-    } finally {
-      println("... Done.")
-    }
   }
 
   private def delete(path: String): Unit = {
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
index 20c8a0617..a4044c925 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala
@@ -18,12 +18,7 @@ package org.apache.spark.sql
 
 import org.apache.spark.{SparkContext, Success, TaskKilled}
 import org.apache.spark.executor.ExecutorMetrics
-import org.apache.spark.scheduler.{
-  SparkListener,
-  SparkListenerExecutorMetricsUpdate,
-  SparkListenerTaskEnd,
-  SparkListenerTaskStart
-}
+import org.apache.spark.scheduler.{SparkListener, 
SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd, 
SparkListenerTaskStart}
 import org.apache.spark.sql.KillTaskListener.INIT_WAIT_TIME_MS
 
 import com.google.common.base.Preconditions
@@ -50,7 +45,8 @@ object QueryRunner {
     "ProcessTreePythonVMemory",
     "ProcessTreePythonRSSMemory",
     "ProcessTreeOtherVMemory",
-    "ProcessTreeOtherRSSMemory")
+    "ProcessTreeOtherRSSMemory"
+  )
 
   def runTpcQuery(
       spark: SparkSession,
@@ -80,7 +76,7 @@ object QueryRunner {
     }
     killTaskListener.foreach(sc.addSparkListener(_))
 
-    print(s"Executing SQL query from resource path $queryPath... ")
+    println(s"Executing SQL query from resource path $queryPath...")
     try {
       val sql = resourceToString(queryPath)
       val prev = System.nanoTime()
@@ -94,13 +90,13 @@ object QueryRunner {
       RunResult(rows, millis, collectedMetrics)
     } finally {
       sc.removeSparkListener(metricsListener)
-      killTaskListener.foreach(l => {
-        sc.removeSparkListener(l)
-        println(s"Successful kill rate ${"%.2f%%"
-          .format(100 * l.successfulKillRate())} during execution of app: 
${sc.applicationId}")
-      })
+      killTaskListener.foreach(
+        l => {
+          sc.removeSparkListener(l)
+          println(s"Successful kill rate ${"%.2f%%".format(
+              100 * l.successfulKillRate())} during execution of app: 
${sc.applicationId}")
+        })
       sc.setJobDescription(null)
-      println("Done.")
     }
   }
 
@@ -160,8 +156,8 @@ class KillTaskListener(val sc: SparkContext) extends 
SparkListener {
             sync.synchronized {
               val total = Math.min(
                 stageKillMaxWaitTimeLookup.computeIfAbsent(taskStart.stageId, 
_ => Long.MaxValue),
-                stageKillWaitTimeLookup
-                  .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS))
+                stageKillWaitTimeLookup.computeIfAbsent(taskStart.stageId, _ 
=> INIT_WAIT_TIME_MS)
+              )
               val elapsed = System.currentTimeMillis() - startMs
               val remaining = total - elapsed
               if (remaining <= 0L) {
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
index 9d20b931c..17a50fd29 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
@@ -72,17 +72,14 @@ class SparkSessionSwitcher(val masterUrl: String, val 
logLevel: String) extends
     if (!sessionMap.containsKey(desc.sessionToken)) {
       throw new IllegalArgumentException(s"Session doesn't exist: $desc")
     }
-    print(s"Switching to $desc session... ")
-    try {
-      stopActiveSession()
-      val conf = new SparkConf(false)
-        .setAllWarningOnOverriding(testDefaults.getAll)
-        .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
-      activateSession(conf, desc.appName)
-      _activeSessionDesc = desc
-    } finally {
-      println("Done. ")
-    }
+    println(s"Switching to $desc session... ")
+    stopActiveSession()
+    val conf = new SparkConf(false)
+      .setAllWarningOnOverriding(testDefaults.getAll)
+      .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll)
+    activateSession(conf, desc.appName)
+    _activeSessionDesc = desc
+    println(s"Successfully switched to $desc session. ")
   }
 
   def spark(): SparkSession = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to