This is an automated email from the ASF dual-hosted git repository. hongze pushed a commit to branch revert-5227-wip-it-log in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit 1d13ab8115b119a2ba35fa653675c7f417dad549 Author: Hongze Zhang <[email protected]> AuthorDate: Mon Apr 1 16:24:05 2024 +0800 Revert "[VL] gluten-it: Shorten table creation and query runner logs (#5227)" This reverts commit f78b7b9f8c570165ef734f2718b63f2d65a8c321. --- .../apache/gluten/integration/tpc/TpcRunner.scala | 19 +++++++-------- .../scala/org/apache/spark/sql/QueryRunner.scala | 28 ++++++++++------------ .../apache/spark/sql/SparkSessionSwitcher.scala | 19 +++++++-------- 3 files changed, 28 insertions(+), 38 deletions(-) diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala index c7ab7febd..f198a5a03 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/TpcRunner.scala @@ -16,7 +16,8 @@ */ package org.apache.gluten.integration.tpc -import org.apache.spark.sql.{AnalysisException, QueryRunner, RunResult, SparkSession} +import org.apache.spark.sql.{QueryRunner, RunResult, SparkSession} + import com.google.common.base.Preconditions import org.apache.commons.io.FileUtils @@ -46,25 +47,21 @@ class TpcRunner(val queryResourceFolder: String, val dataPath: String) { object TpcRunner { def createTables(spark: SparkSession, dataPath: String): Unit = { - print("Creating catalog tables: ") - try { - val files = new File(dataPath).listFiles() - files.foreach(file => { + val files = new File(dataPath).listFiles() + files.foreach( + file => { if (spark.catalog.tableExists(file.getName)) { - print(s"${file.getName}(exists), ") + println("Table exists: " + file.getName) } else { - print(s"${file.getName}, ") + println("Creating catalog table: " + file.getName) spark.catalog.createTable(file.getName, file.getAbsolutePath, "parquet") try { spark.catalog.recoverPartitions(file.getName) } catch { - case _: AnalysisException => + case _: Throwable => } } }) - } finally { - println("... Done.") - } } private def delete(path: String): Unit = { diff --git a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala index 20c8a0617..a4044c925 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala @@ -18,12 +18,7 @@ package org.apache.spark.sql import org.apache.spark.{SparkContext, Success, TaskKilled} import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.scheduler.{ - SparkListener, - SparkListenerExecutorMetricsUpdate, - SparkListenerTaskEnd, - SparkListenerTaskStart -} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.sql.KillTaskListener.INIT_WAIT_TIME_MS import com.google.common.base.Preconditions @@ -50,7 +45,8 @@ object QueryRunner { "ProcessTreePythonVMemory", "ProcessTreePythonRSSMemory", "ProcessTreeOtherVMemory", - "ProcessTreeOtherRSSMemory") + "ProcessTreeOtherRSSMemory" + ) def runTpcQuery( spark: SparkSession, @@ -80,7 +76,7 @@ object QueryRunner { } killTaskListener.foreach(sc.addSparkListener(_)) - print(s"Executing SQL query from resource path $queryPath... ") + println(s"Executing SQL query from resource path $queryPath...") try { val sql = resourceToString(queryPath) val prev = System.nanoTime() @@ -94,13 +90,13 @@ object QueryRunner { RunResult(rows, millis, collectedMetrics) } finally { sc.removeSparkListener(metricsListener) - killTaskListener.foreach(l => { - sc.removeSparkListener(l) - println(s"Successful kill rate ${"%.2f%%" - .format(100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}") - }) + killTaskListener.foreach( + l => { + sc.removeSparkListener(l) + println(s"Successful kill rate ${"%.2f%%".format( + 100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}") + }) sc.setJobDescription(null) - println("Done.") } } @@ -160,8 +156,8 @@ class KillTaskListener(val sc: SparkContext) extends SparkListener { sync.synchronized { val total = Math.min( stageKillMaxWaitTimeLookup.computeIfAbsent(taskStart.stageId, _ => Long.MaxValue), - stageKillWaitTimeLookup - .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS)) + stageKillWaitTimeLookup.computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS) + ) val elapsed = System.currentTimeMillis() - startMs val remaining = total - elapsed if (remaining <= 0L) { diff --git a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala index 9d20b931c..17a50fd29 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala @@ -72,17 +72,14 @@ class SparkSessionSwitcher(val masterUrl: String, val logLevel: String) extends if (!sessionMap.containsKey(desc.sessionToken)) { throw new IllegalArgumentException(s"Session doesn't exist: $desc") } - print(s"Switching to $desc session... ") - try { - stopActiveSession() - val conf = new SparkConf(false) - .setAllWarningOnOverriding(testDefaults.getAll) - .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll) - activateSession(conf, desc.appName) - _activeSessionDesc = desc - } finally { - println("Done. ") - } + println(s"Switching to $desc session... ") + stopActiveSession() + val conf = new SparkConf(false) + .setAllWarningOnOverriding(testDefaults.getAll) + .setAllWarningOnOverriding(sessionMap.get(desc.sessionToken).getAll) + activateSession(conf, desc.appName) + _activeSessionDesc = desc + println(s"Successfully switched to $desc session. ") } def spark(): SparkSession = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
