This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a77a51b5f [VL] Various fixes for gluten-it (#11642)
1a77a51b5f is described below

commit 1a77a51b5f39eeb92eabed7070498b6e4cb48fce
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Feb 24 09:42:08 2026 +0000

    [VL] Various fixes for gluten-it (#11642)
---
 .../org/apache/gluten/integration/BaseMixin.java   | 30 ++++----
 .../org/apache/gluten/integration/Constants.scala  | 23 +-----
 .../org/apache/gluten/integration/Suite.scala      | 52 ++++++++-----
 .../integration/clickbench/ClickBenchDataGen.scala | 86 ++++++++++++++++------
 .../integration/clickbench/ClickBenchSuite.scala   |  2 +
 .../apache/gluten/integration/ds/TpcdsSuite.scala  |  3 +-
 .../apache/gluten/integration/h/TpchSuite.scala    |  2 +
 .../apache/spark/sql/SparkSessionSwitcher.scala    |  7 +-
 8 files changed, 120 insertions(+), 85 deletions(-)

diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
index b7ccf01f04..9ff11d8fa4 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
@@ -54,11 +54,17 @@ public class BaseMixin {
       defaultValue = "vanilla")
   private String baselinePreset;
 
+  @CommandLine.Option(
+      names = {"--app-name"},
+      description = "The name of Spark application started by the benchmark",
+      defaultValue = "Gluten Integration Test")
+  private String appName;
+
   @CommandLine.Option(
       names = {"--log-level"},
-      description = "Set log level: 0 for DEBUG, 1 for INFO, 2 for WARN",
-      defaultValue = "2")
-  private int logLevel;
+      description = "Set log level: DEBUG, INFO, WARN, etc.",
+      defaultValue = "WARN")
+  private String logLevel;
 
   @CommandLine.Option(
       names = {"--error-on-memleak"},
@@ -183,20 +189,7 @@ public class BaseMixin {
   }
 
   public Integer runActions(Action[] actions) {
-    final Level level;
-    switch (logLevel) {
-      case 0:
-        level = Level.DEBUG;
-        break;
-      case 1:
-        level = Level.INFO;
-        break;
-      case 2:
-        level = Level.WARN;
-        break;
-      default:
-        throw new IllegalArgumentException("Log level not found: " + logLevel);
-    }
+    final Level level = Level.toLevel(logLevel);
     System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, 
level.toString());
     LogManager.getRootLogger().setLevel(level);
 
@@ -215,6 +208,7 @@ public class BaseMixin {
       case "h":
         suite =
             new TpchSuite(
+                appName,
                 runModeEnumeration.getSparkMasterUrl(),
                 actions,
                 testConf,
@@ -244,6 +238,7 @@ public class BaseMixin {
       case "ds":
         suite =
             new TpcdsSuite(
+                appName,
                 runModeEnumeration.getSparkMasterUrl(),
                 actions,
                 testConf,
@@ -273,6 +268,7 @@ public class BaseMixin {
       case "clickbench":
         suite =
             new ClickBenchSuite(
+                appName,
                 runModeEnumeration.getSparkMasterUrl(),
                 actions,
                 testConf,
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
index a37cf71617..19867b6476 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
@@ -30,41 +30,23 @@ object Constants {
   val VANILLA_CONF: SparkConf = new SparkConf(false)
 
   val VELOX_CONF: SparkConf = new SparkConf(false)
-    .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
-    .set("spark.sql.parquet.enableVectorizedReader", "true")
     .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
     .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
-    .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
-    
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
 "0")
-    .set(
-      "spark.gluten.sql.columnar.physicalJoinOptimizeEnable",
-      "false"
-    ) // q72 slow if false, q64 fails if true
 
   val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false)
-    .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
-    .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled", 
"false")
-    .set("spark.sql.parquet.enableVectorizedReader", "true")
     .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
     .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager")
+    .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled", 
"false")
     .set("spark.celeborn.shuffle.writer", "hash")
     .set("spark.celeborn.push.replicate.enabled", "false")
     .set("spark.celeborn.client.shuffle.compression.codec", "none")
     .set("spark.shuffle.service.enabled", "false")
     .set("spark.sql.adaptive.localShuffleReader.enabled", "false")
     .set("spark.dynamicAllocation.enabled", "false")
-    .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
-    
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
 "0")
-    .set(
-      "spark.gluten.sql.columnar.physicalJoinOptimizeEnable",
-      "false"
-    ) // q72 slow if false, q64 fails if true
     .set("spark.celeborn.push.data.timeout", "600s")
     .set("spark.celeborn.push.limit.inFlight.timeout", "1200s")
 
   val VELOX_WITH_UNIFFLE_CONF: SparkConf = new SparkConf(false)
-    .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
-    .set("spark.sql.parquet.enableVectorizedReader", "true")
     .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
     .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.gluten.uniffle.UniffleShuffleManager")
     .set("spark.rss.coordinator.quorum", "localhost:19999")
@@ -74,9 +56,6 @@ object Constants {
     .set("spark.shuffle.service.enabled", "false")
     .set("spark.sql.adaptive.localShuffleReader.enabled", "false")
     .set("spark.dynamicAllocation.enabled", "false")
-    .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
-    
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
 "0")
-    .set("spark.gluten.sql.columnar.physicalJoinOptimizeEnable", "false")
 
   val VANILLA_METRIC_MAPPER: MetricMapper = SimpleMetricMapper(
     Seq(MetricTag.IsSelfTime),
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index 5503889d19..112321b8d2 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -29,14 +29,16 @@ import org.apache.spark.sql.SparkSessionSwitcher
 
 import org.apache.commons.io.output.{NullOutputStream, TeeOutputStream}
 import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
 import org.apache.log4j.{Level, LogManager}
 
-import java.io.{BufferedOutputStream, File, FileNotFoundException, 
FileOutputStream, OutputStream, PrintStream}
+import java.io.{BufferedOutputStream, File, OutputStream, PrintStream}
 import java.time.{Instant, ZoneId}
 import java.time.format.DateTimeFormatter
 import java.util.Scanner
 
 abstract class Suite(
+    private val appName: String,
     private val masterUrl: String,
     private val actions: Array[Action],
     private val testConf: SparkConf,
@@ -64,7 +66,7 @@ abstract class Suite(
   private var hsUiBoundPort: Int = -1
 
   private[integration] val sessionSwitcher: SparkSessionSwitcher =
-    new SparkSessionSwitcher(masterUrl, logLevel.toString)
+    new SparkSessionSwitcher(appName, masterUrl, logLevel.toString)
 
   // define initial configs
   sessionSwitcher.addDefaultConf("spark.sql.sources.useV1SourceList", "")
@@ -166,17 +168,25 @@ abstract class Suite(
     reporter.addMetadata("Arguments", Cli.args().mkString(" "))
 
     // Construct the output streams for writing test reports.
-    var fileOut: OutputStream = null
-    if (!StringUtils.isBlank(reportPath)) try {
-      val file = new File(reportPath)
-      if (file.isDirectory) throw new FileNotFoundException("Is a directory: " 
+ reportPath)
-      println("Test report will be written to " + file.getAbsolutePath)
-      fileOut = new BufferedOutputStream(new FileOutputStream(file))
-    } catch {
-      case e: FileNotFoundException =>
-        throw new RuntimeException(e)
-    }
-    else fileOut = NullOutputStream.NULL_OUTPUT_STREAM
+    val fileOut: OutputStream =
+      if (!StringUtils.isBlank(reportPath)) {
+        try {
+          sessionSwitcher.useSession("baseline", "Suite Initialization")
+          val conf = sessionSwitcher.spark().sessionState.newHadoopConf()
+          val path = new Path(reportPath)
+          val fs = path.getFileSystem(conf)
+          if (fs.exists(path) && fs.getFileStatus(path).isDirectory) {
+            throw new java.io.FileNotFoundException("Is a directory: " + 
reportPath)
+          }
+          println(s"Test report will be written to ${path.toString}")
+          new BufferedOutputStream(fs.create(path, true)) // overwrite = true
+        } catch {
+          case e: java.io.FileNotFoundException =>
+            throw new RuntimeException(e)
+        }
+      } else {
+        NullOutputStream.NULL_OUTPUT_STREAM
+      }
     val combinedOut = new PrintStream(new TeeOutputStream(System.out, 
fileOut), true)
     val combinedErr = new PrintStream(new TeeOutputStream(System.err, 
fileOut), true)
 
@@ -189,12 +199,18 @@ abstract class Suite(
           t.printStackTrace(reporter.rootAppender.err)
           false
       }
-    if (succeeded) {
-      reporter.write(combinedOut)
-    } else {
-      reporter.write(combinedErr)
+    try {
+      if (succeeded) {
+        reporter.write(combinedOut)
+        combinedOut.flush()
+      } else {
+        reporter.write(combinedErr)
+        combinedErr.flush()
+      }
+      succeeded
+    } finally {
+      fileOut.close()
     }
-    succeeded
   }
 
   private def runActions(): Boolean = {
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
index 83ffb270f1..c17f68eb49 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
@@ -20,45 +20,83 @@ import org.apache.gluten.integration.DataGen
 
 import org.apache.spark.sql.{functions, SparkSession}
 
-import org.apache.commons.io.FileUtils
-
-import java.io.File
-
-import scala.language.postfixOps
-import scala.sys.process._
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 class ClickBenchDataGen(dir: String) extends DataGen {
+
   import ClickBenchDataGen._
+
   override def gen(spark: SparkSession): Unit = {
-    println(s"Start to download ClickBench Parquet dataset from URL: 
$DATA_URL... ")
-    // Directly download from official URL.
-    val tempFile = new File(dir + File.separator + TMP_FILE_NAME)
-    FileUtils.forceMkdirParent(tempFile)
-    val cmd =
-      s"wget --no-verbose --show-progress --progress=bar:force:noscroll -O 
$tempFile $DATA_URL"
-    println(s"Executing command: $cmd")
-    val code = Process(cmd) !;
-    if (code != 0) {
-      throw new RuntimeException("Download failed")
+    println(s"Start to download ClickBench Parquet dataset from URL: 
$DATA_URL...")
+
+    val conf = spark.sessionState.newHadoopConf()
+    val fs = FileSystem.get(conf)
+
+    val tmpPath = new Path(dir, TMP_FILE_NAME)
+    val outputPath = new Path(dir, FILE_NAME)
+
+    if (!fs.exists(tmpPath.getParent)) {
+      fs.mkdirs(tmpPath.getParent)
     }
-    println(s"ClickBench Parquet dataset successfully downloaded to 
$tempFile.")
 
-    val sparkDataFile = new File(dir + File.separator + FILE_NAME)
-    println(s"Starting to write a data file $sparkDataFile that is compatible 
with Spark... ")
+    downloadWithProgress(DATA_URL, tmpPath, fs)
+
+    println(s"ClickBench Parquet dataset successfully downloaded to $tmpPath.")
+    println(s"Starting to write a data file $outputPath that is compatible 
with Spark...")
+
     spark.read
-      .parquet(tempFile.getAbsolutePath)
+      .parquet(tmpPath.toString)
       .withColumn("eventtime", functions.col("eventtime").cast("timestamp"))
       .withColumn("clienteventtime", 
functions.col("clienteventtime").cast("timestamp"))
       .withColumn("localeventtime", 
functions.col("localeventtime").cast("timestamp"))
       .write
-      .parquet(sparkDataFile.getAbsolutePath)
-    println(
-      s"ClickBench Parquet dataset (Spark compatible) successfully created at 
$sparkDataFile.")
+      .mode("overwrite")
+      .parquet(outputPath.toString)
+
+    println(s"ClickBench Parquet dataset (Spark compatible) successfully 
created at $outputPath.")
+  }
+
+  private def downloadWithProgress(urlStr: String, target: Path, fs: 
FileSystem): Unit = {
+
+    import java.net.URL
+
+    val connection = new URL(urlStr).openConnection()
+    val contentLength = connection.getContentLengthLong
+
+    val in = connection.getInputStream
+    val out = fs.create(target, true)
+
+    val buffer = new Array[Byte](1024 * 1024) // 1MB
+    var bytesRead = 0
+    var totalRead = 0L
+    var nextPrint = 1
+
+    try {
+      while ({
+        bytesRead = in.read(buffer)
+        bytesRead != -1
+      }) {
+        out.write(buffer, 0, bytesRead)
+        totalRead += bytesRead
+
+        if (contentLength > 0) {
+          val percent = ((totalRead * 100) / contentLength).toInt
+          if (percent >= nextPrint) {
+            println(s"Download progress: $percent%")
+            nextPrint += 1
+          }
+        }
+      }
+    } finally {
+      in.close()
+      out.close()
+    }
   }
 }
 
 object ClickBenchDataGen {
-  private val DATA_URL = 
"https://datasets.clickhouse.com/hits_compatible/hits.parquet";
+  private val DATA_URL =
+    "https://datasets.clickhouse.com/hits_compatible/hits.parquet";
   private val TMP_FILE_NAME = "hits.parquet.tmp"
   private[clickbench] val FILE_NAME = "hits.parquet"
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
index 200bf01e06..1eb4c3bd07 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
@@ -33,6 +33,7 @@ import java.io.File
  * See the project: https://github.com/ClickHouse/ClickBench Site: 
https://benchmark.clickhouse.com/
  */
 class ClickBenchSuite(
+    val appName: String,
     val masterUrl: String,
     val actions: Array[Action],
     val testConf: SparkConf,
@@ -58,6 +59,7 @@ class ClickBenchSuite(
     val testMetricMapper: MetricMapper,
     val reportPath: String)
   extends Suite(
+    appName,
     masterUrl,
     actions,
     testConf,
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 0de9ad1c36..427c088be2 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.integration.ds
 import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableAnalyzer, 
TableCreator}
 import org.apache.gluten.integration.action.Action
 import org.apache.gluten.integration.metrics.MetricMapper
-import org.apache.gluten.integration.report.TestReporter
 
 import org.apache.spark.SparkConf
 
@@ -27,6 +26,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.log4j.Level
 
 class TpcdsSuite(
+    val appName: String,
     val masterUrl: String,
     val actions: Array[Action],
     val testConf: SparkConf,
@@ -53,6 +53,7 @@ class TpcdsSuite(
     val testMetricMapper: MetricMapper,
     val reportPath: String)
   extends Suite(
+    appName,
     masterUrl,
     actions,
     testConf,
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index 6437b301ba..142eb4e4a4 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -30,6 +30,7 @@ import org.apache.log4j.Level
 import java.io.File
 
 class TpchSuite(
+    val appName: String,
     val masterUrl: String,
     val actions: Array[Action],
     val testConf: SparkConf,
@@ -56,6 +57,7 @@ class TpchSuite(
     val testMetricMapper: MetricMapper,
     val reportPath: String)
   extends Suite(
+    appName,
     masterUrl,
     actions,
     testConf,
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
index 61645ad1f5..6ea0fa5ef7 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 
 import org.apache.hadoop.fs.LocalFileSystem
 
-class SparkSessionSwitcher(val masterUrl: String, val logLevel: String) 
extends AutoCloseable {
+class SparkSessionSwitcher(val appName: String, val masterUrl: String, val 
logLevel: String)
+  extends AutoCloseable {
 
   private val testDefaults = new SparkConf(false)
     .setWarningOnOverriding("spark.hadoop.fs.file.impl", 
classOf[LocalFileSystem].getName)
@@ -68,8 +69,8 @@ class SparkSessionSwitcher(val masterUrl: String, val 
logLevel: String) extends
     return token
   }
 
-  def useSession(token: String, appName: String = "gluten-app"): Unit = 
synchronized {
-    useSession(SessionDesc(SessionToken(token), appName))
+  def useSession(token: String, description: String): Unit = synchronized {
+    useSession(SessionDesc(SessionToken(token), s"$appName / $description"))
   }
 
   def renewSession(): Unit = synchronized {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to