This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1a77a51b5f [VL] Various fixes for gluten-it (#11642)
1a77a51b5f is described below
commit 1a77a51b5f39eeb92eabed7070498b6e4cb48fce
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Feb 24 09:42:08 2026 +0000
[VL] Various fixes for gluten-it (#11642)
---
.../org/apache/gluten/integration/BaseMixin.java | 30 ++++----
.../org/apache/gluten/integration/Constants.scala | 23 +-----
.../org/apache/gluten/integration/Suite.scala | 52 ++++++++-----
.../integration/clickbench/ClickBenchDataGen.scala | 86 ++++++++++++++++------
.../integration/clickbench/ClickBenchSuite.scala | 2 +
.../apache/gluten/integration/ds/TpcdsSuite.scala | 3 +-
.../apache/gluten/integration/h/TpchSuite.scala | 2 +
.../apache/spark/sql/SparkSessionSwitcher.scala | 7 +-
8 files changed, 120 insertions(+), 85 deletions(-)
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
index b7ccf01f04..9ff11d8fa4 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/BaseMixin.java
@@ -54,11 +54,17 @@ public class BaseMixin {
defaultValue = "vanilla")
private String baselinePreset;
+ @CommandLine.Option(
+ names = {"--app-name"},
+ description = "The name of Spark application started by the benchmark",
+ defaultValue = "Gluten Integration Test")
+ private String appName;
+
@CommandLine.Option(
names = {"--log-level"},
- description = "Set log level: 0 for DEBUG, 1 for INFO, 2 for WARN",
- defaultValue = "2")
- private int logLevel;
+ description = "Set log level: DEBUG, INFO, WARN, etc.",
+ defaultValue = "WARN")
+ private String logLevel;
@CommandLine.Option(
names = {"--error-on-memleak"},
@@ -183,20 +189,7 @@ public class BaseMixin {
}
public Integer runActions(Action[] actions) {
- final Level level;
- switch (logLevel) {
- case 0:
- level = Level.DEBUG;
- break;
- case 1:
- level = Level.INFO;
- break;
- case 2:
- level = Level.WARN;
- break;
- default:
- throw new IllegalArgumentException("Log level not found: " + logLevel);
- }
+ final Level level = Level.toLevel(logLevel);
System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY,
level.toString());
LogManager.getRootLogger().setLevel(level);
@@ -215,6 +208,7 @@ public class BaseMixin {
case "h":
suite =
new TpchSuite(
+ appName,
runModeEnumeration.getSparkMasterUrl(),
actions,
testConf,
@@ -244,6 +238,7 @@ public class BaseMixin {
case "ds":
suite =
new TpcdsSuite(
+ appName,
runModeEnumeration.getSparkMasterUrl(),
actions,
testConf,
@@ -273,6 +268,7 @@ public class BaseMixin {
case "clickbench":
suite =
new ClickBenchSuite(
+ appName,
runModeEnumeration.getSparkMasterUrl(),
actions,
testConf,
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
index a37cf71617..19867b6476 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Constants.scala
@@ -30,41 +30,23 @@ object Constants {
val VANILLA_CONF: SparkConf = new SparkConf(false)
val VELOX_CONF: SparkConf = new SparkConf(false)
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
- .set("spark.sql.parquet.enableVectorizedReader", "true")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
- .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
-
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
"0")
- .set(
- "spark.gluten.sql.columnar.physicalJoinOptimizeEnable",
- "false"
- ) // q72 slow if false, q64 fails if true
val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false)
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
- .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled",
"false")
- .set("spark.sql.parquet.enableVectorizedReader", "true")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager")
+ .set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled",
"false")
.set("spark.celeborn.shuffle.writer", "hash")
.set("spark.celeborn.push.replicate.enabled", "false")
.set("spark.celeborn.client.shuffle.compression.codec", "none")
.set("spark.shuffle.service.enabled", "false")
.set("spark.sql.adaptive.localShuffleReader.enabled", "false")
.set("spark.dynamicAllocation.enabled", "false")
- .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
-
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
"0")
- .set(
- "spark.gluten.sql.columnar.physicalJoinOptimizeEnable",
- "false"
- ) // q72 slow if false, q64 fails if true
.set("spark.celeborn.push.data.timeout", "600s")
.set("spark.celeborn.push.limit.inFlight.timeout", "1200s")
val VELOX_WITH_UNIFFLE_CONF: SparkConf = new SparkConf(false)
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
- .set("spark.sql.parquet.enableVectorizedReader", "true")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.gluten.uniffle.UniffleShuffleManager")
.set("spark.rss.coordinator.quorum", "localhost:19999")
@@ -74,9 +56,6 @@ object Constants {
.set("spark.shuffle.service.enabled", "false")
.set("spark.sql.adaptive.localShuffleReader.enabled", "false")
.set("spark.dynamicAllocation.enabled", "false")
- .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
-
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
"0")
- .set("spark.gluten.sql.columnar.physicalJoinOptimizeEnable", "false")
val VANILLA_METRIC_MAPPER: MetricMapper = SimpleMetricMapper(
Seq(MetricTag.IsSelfTime),
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index 5503889d19..112321b8d2 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -29,14 +29,16 @@ import org.apache.spark.sql.SparkSessionSwitcher
import org.apache.commons.io.output.{NullOutputStream, TeeOutputStream}
import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
import org.apache.log4j.{Level, LogManager}
-import java.io.{BufferedOutputStream, File, FileNotFoundException,
FileOutputStream, OutputStream, PrintStream}
+import java.io.{BufferedOutputStream, File, OutputStream, PrintStream}
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import java.util.Scanner
abstract class Suite(
+ private val appName: String,
private val masterUrl: String,
private val actions: Array[Action],
private val testConf: SparkConf,
@@ -64,7 +66,7 @@ abstract class Suite(
private var hsUiBoundPort: Int = -1
private[integration] val sessionSwitcher: SparkSessionSwitcher =
- new SparkSessionSwitcher(masterUrl, logLevel.toString)
+ new SparkSessionSwitcher(appName, masterUrl, logLevel.toString)
// define initial configs
sessionSwitcher.addDefaultConf("spark.sql.sources.useV1SourceList", "")
@@ -166,17 +168,25 @@ abstract class Suite(
reporter.addMetadata("Arguments", Cli.args().mkString(" "))
// Construct the output streams for writing test reports.
- var fileOut: OutputStream = null
- if (!StringUtils.isBlank(reportPath)) try {
- val file = new File(reportPath)
- if (file.isDirectory) throw new FileNotFoundException("Is a directory: "
+ reportPath)
- println("Test report will be written to " + file.getAbsolutePath)
- fileOut = new BufferedOutputStream(new FileOutputStream(file))
- } catch {
- case e: FileNotFoundException =>
- throw new RuntimeException(e)
- }
- else fileOut = NullOutputStream.NULL_OUTPUT_STREAM
+ val fileOut: OutputStream =
+ if (!StringUtils.isBlank(reportPath)) {
+ try {
+ sessionSwitcher.useSession("baseline", "Suite Initialization")
+ val conf = sessionSwitcher.spark().sessionState.newHadoopConf()
+ val path = new Path(reportPath)
+ val fs = path.getFileSystem(conf)
+ if (fs.exists(path) && fs.getFileStatus(path).isDirectory) {
+ throw new java.io.FileNotFoundException("Is a directory: " +
reportPath)
+ }
+ println(s"Test report will be written to ${path.toString}")
+ new BufferedOutputStream(fs.create(path, true)) // overwrite = true
+ } catch {
+ case e: java.io.FileNotFoundException =>
+ throw new RuntimeException(e)
+ }
+ } else {
+ NullOutputStream.NULL_OUTPUT_STREAM
+ }
val combinedOut = new PrintStream(new TeeOutputStream(System.out,
fileOut), true)
val combinedErr = new PrintStream(new TeeOutputStream(System.err,
fileOut), true)
@@ -189,12 +199,18 @@ abstract class Suite(
t.printStackTrace(reporter.rootAppender.err)
false
}
- if (succeeded) {
- reporter.write(combinedOut)
- } else {
- reporter.write(combinedErr)
+ try {
+ if (succeeded) {
+ reporter.write(combinedOut)
+ combinedOut.flush()
+ } else {
+ reporter.write(combinedErr)
+ combinedErr.flush()
+ }
+ succeeded
+ } finally {
+ fileOut.close()
}
- succeeded
}
private def runActions(): Boolean = {
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
index 83ffb270f1..c17f68eb49 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchDataGen.scala
@@ -20,45 +20,83 @@ import org.apache.gluten.integration.DataGen
import org.apache.spark.sql.{functions, SparkSession}
-import org.apache.commons.io.FileUtils
-
-import java.io.File
-
-import scala.language.postfixOps
-import scala.sys.process._
+import org.apache.hadoop.fs.{FileSystem, Path}
class ClickBenchDataGen(dir: String) extends DataGen {
+
import ClickBenchDataGen._
+
override def gen(spark: SparkSession): Unit = {
- println(s"Start to download ClickBench Parquet dataset from URL:
$DATA_URL... ")
- // Directly download from official URL.
- val tempFile = new File(dir + File.separator + TMP_FILE_NAME)
- FileUtils.forceMkdirParent(tempFile)
- val cmd =
- s"wget --no-verbose --show-progress --progress=bar:force:noscroll -O
$tempFile $DATA_URL"
- println(s"Executing command: $cmd")
- val code = Process(cmd) !;
- if (code != 0) {
- throw new RuntimeException("Download failed")
+ println(s"Start to download ClickBench Parquet dataset from URL:
$DATA_URL...")
+
+ val conf = spark.sessionState.newHadoopConf()
+ val fs = FileSystem.get(conf)
+
+ val tmpPath = new Path(dir, TMP_FILE_NAME)
+ val outputPath = new Path(dir, FILE_NAME)
+
+ if (!fs.exists(tmpPath.getParent)) {
+ fs.mkdirs(tmpPath.getParent)
}
- println(s"ClickBench Parquet dataset successfully downloaded to
$tempFile.")
- val sparkDataFile = new File(dir + File.separator + FILE_NAME)
- println(s"Starting to write a data file $sparkDataFile that is compatible
with Spark... ")
+ downloadWithProgress(DATA_URL, tmpPath, fs)
+
+ println(s"ClickBench Parquet dataset successfully downloaded to $tmpPath.")
+ println(s"Starting to write a data file $outputPath that is compatible
with Spark...")
+
spark.read
- .parquet(tempFile.getAbsolutePath)
+ .parquet(tmpPath.toString)
.withColumn("eventtime", functions.col("eventtime").cast("timestamp"))
.withColumn("clienteventtime",
functions.col("clienteventtime").cast("timestamp"))
.withColumn("localeventtime",
functions.col("localeventtime").cast("timestamp"))
.write
- .parquet(sparkDataFile.getAbsolutePath)
- println(
- s"ClickBench Parquet dataset (Spark compatible) successfully created at
$sparkDataFile.")
+ .mode("overwrite")
+ .parquet(outputPath.toString)
+
+ println(s"ClickBench Parquet dataset (Spark compatible) successfully
created at $outputPath.")
+ }
+
+ private def downloadWithProgress(urlStr: String, target: Path, fs:
FileSystem): Unit = {
+
+ import java.net.URL
+
+ val connection = new URL(urlStr).openConnection()
+ val contentLength = connection.getContentLengthLong
+
+ val in = connection.getInputStream
+ val out = fs.create(target, true)
+
+ val buffer = new Array[Byte](1024 * 1024) // 1MB
+ var bytesRead = 0
+ var totalRead = 0L
+ var nextPrint = 1
+
+ try {
+ while ({
+ bytesRead = in.read(buffer)
+ bytesRead != -1
+ }) {
+ out.write(buffer, 0, bytesRead)
+ totalRead += bytesRead
+
+ if (contentLength > 0) {
+ val percent = ((totalRead * 100) / contentLength).toInt
+ if (percent >= nextPrint) {
+ println(s"Download progress: $percent%")
+ nextPrint += 1
+ }
+ }
+ }
+ } finally {
+ in.close()
+ out.close()
+ }
}
}
object ClickBenchDataGen {
- private val DATA_URL =
"https://datasets.clickhouse.com/hits_compatible/hits.parquet"
+ private val DATA_URL =
+ "https://datasets.clickhouse.com/hits_compatible/hits.parquet"
private val TMP_FILE_NAME = "hits.parquet.tmp"
private[clickbench] val FILE_NAME = "hits.parquet"
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
index 200bf01e06..1eb4c3bd07 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/clickbench/ClickBenchSuite.scala
@@ -33,6 +33,7 @@ import java.io.File
* See the project: https://github.com/ClickHouse/ClickBench Site:
https://benchmark.clickhouse.com/
*/
class ClickBenchSuite(
+ val appName: String,
val masterUrl: String,
val actions: Array[Action],
val testConf: SparkConf,
@@ -58,6 +59,7 @@ class ClickBenchSuite(
val testMetricMapper: MetricMapper,
val reportPath: String)
extends Suite(
+ appName,
masterUrl,
actions,
testConf,
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
index 0de9ad1c36..427c088be2 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.integration.ds
import org.apache.gluten.integration.{DataGen, QuerySet, Suite, TableAnalyzer,
TableCreator}
import org.apache.gluten.integration.action.Action
import org.apache.gluten.integration.metrics.MetricMapper
-import org.apache.gluten.integration.report.TestReporter
import org.apache.spark.SparkConf
@@ -27,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.log4j.Level
class TpcdsSuite(
+ val appName: String,
val masterUrl: String,
val actions: Array[Action],
val testConf: SparkConf,
@@ -53,6 +53,7 @@ class TpcdsSuite(
val testMetricMapper: MetricMapper,
val reportPath: String)
extends Suite(
+ appName,
masterUrl,
actions,
testConf,
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
index 6437b301ba..142eb4e4a4 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchSuite.scala
@@ -30,6 +30,7 @@ import org.apache.log4j.Level
import java.io.File
class TpchSuite(
+ val appName: String,
val masterUrl: String,
val actions: Array[Action],
val testConf: SparkConf,
@@ -56,6 +57,7 @@ class TpchSuite(
val testMetricMapper: MetricMapper,
val reportPath: String)
extends Suite(
+ appName,
masterUrl,
actions,
testConf,
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
index 61645ad1f5..6ea0fa5ef7 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkSessionSwitcher.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.hadoop.fs.LocalFileSystem
-class SparkSessionSwitcher(val masterUrl: String, val logLevel: String)
extends AutoCloseable {
+class SparkSessionSwitcher(val appName: String, val masterUrl: String, val
logLevel: String)
+ extends AutoCloseable {
private val testDefaults = new SparkConf(false)
.setWarningOnOverriding("spark.hadoop.fs.file.impl",
classOf[LocalFileSystem].getName)
@@ -68,8 +69,8 @@ class SparkSessionSwitcher(val masterUrl: String, val
logLevel: String) extends
return token
}
- def useSession(token: String, appName: String = "gluten-app"): Unit =
synchronized {
- useSession(SessionDesc(SessionToken(token), appName))
+ def useSession(token: String, description: String): Unit = synchronized {
+ useSession(SessionDesc(SessionToken(token), s"$appName / $description"))
}
def renewSession(): Unit = synchronized {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]