This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e8583c31d [GLUTEN-6067][CH][MINOR][UT] Pass backends-clickhouse ut in
Spark 3.5 (#6623)
e8583c31d is described below
commit e8583c31d5abd2a02858868482686cd099907716
Author: Chang chen <[email protected]>
AuthorDate: Wed Jul 31 13:48:13 2024 +0800
[GLUTEN-6067][CH][MINOR][UT] Pass backends-clickhouse ut in Spark 3.5
(#6623)
* 1. move tpch parquet ut to tpch package
2. move tpcds ut to tpcds package
3. pass ut in spark 3.5
* we don't support 3.4
1. isSparkVersionGE("3.5")
2. isSparkVersionLE("3.3")
---
.../execution/GlutenClickHouseDecimalSuite.scala | 25 ++++----
.../execution/GlutenClickHouseHiveTableSuite.scala | 8 ++-
.../GlutenClickHouseNativeWriteTableSuite.scala | 4 +-
.../GlutenClickHouseTPCDSAbstractSuite.scala | 55 +++++++++---------
.../GlutenClickHouseTPCHBucketSuite.scala | 34 +++++------
...lutenClickHouseWholeStageTransformerSuite.scala | 21 +++++--
.../GlutenClickhouseCountDistinctSuite.scala | 4 +-
.../metrics/GlutenClickHouseTPCHMetricsSuite.scala | 22 ++++----
.../parquet/GlutenParquetFilterSuite.scala | 44 +++++++++------
.../GlutenClickHouseTPCDSParquetAQESuite.scala | 30 +++++-----
...kHouseTPCDSParquetColumnarShuffleAQESuite.scala | 28 ++++-----
...lickHouseTPCDSParquetColumnarShuffleSuite.scala | 26 +++++----
...nClickHouseTPCDSParquetGraceHashJoinSuite.scala | 14 +++--
.../GlutenClickHouseTPCDSParquetRFSuite.scala | 4 +-
...nClickHouseTPCDSParquetSortMergeJoinSuite.scala | 21 +++----
.../GlutenClickHouseTPCDSParquetSuite.scala | 52 +++++++++--------
.../GlutenClickHouseDatetimeExpressionSuite.scala | 4 +-
...ckHouseTPCHColumnarShuffleParquetAQESuite.scala | 19 ++++---
...enClickHouseTPCHParquetAQEConcurrentSuite.scala | 4 +-
.../GlutenClickHouseTPCHParquetAQESuite.scala | 13 ++---
.../GlutenClickHouseTPCHParquetBucketSuite.scala | 54 +++++++++---------
.../GlutenClickHouseTPCHParquetRFSuite.scala | 9 ++-
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 66 ++++++++++------------
.../apache/spark/gluten/NativeWriteChecker.scala | 2 +-
24 files changed, 298 insertions(+), 265 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala
index 7320b7c05..cf1bdd296 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDecimalSuite.scala
@@ -66,10 +66,10 @@ class GlutenClickHouseDecimalSuite
private val decimalTable: String = "decimal_table"
private val decimalTPCHTables: Seq[(DecimalType, Seq[Int])] = Seq.apply(
(DecimalType.apply(9, 4), Seq()),
- // 1: ch decimal avg is float
(DecimalType.apply(18, 8), Seq()),
- // 1: ch decimal avg is float, 3/10: all value is null and compare with
limit
- (DecimalType.apply(38, 19), Seq(3, 10))
+ // 3/10: all value is null and compare with limit
+ // 1 Spark 3.5
+ (DecimalType.apply(38, 19), if (isSparkVersionLE("3.3")) Seq(3, 10) else
Seq(1, 3, 10))
)
private def createDecimalTables(dataType: DecimalType): Unit = {
@@ -343,19 +343,14 @@ class GlutenClickHouseDecimalSuite
decimalTPCHTables.foreach {
dt =>
{
+ val fallBack = (sql_num == 16 || sql_num == 21)
+ val compareResult = !dt._2.contains(sql_num)
+ val native = if (fallBack) "fallback" else "native"
+ val compare = if (compareResult) "compare" else "noCompare"
+ val PrecisionLoss =
s"allowPrecisionLoss=$allowPrecisionLoss"
val decimalType = dt._1
test(s"""TPCH
Decimal(${decimalType.precision},${decimalType.scale})
- |
Q$sql_num[allowPrecisionLoss=$allowPrecisionLoss]""".stripMargin) {
- var noFallBack = true
- var compareResult = true
- if (sql_num == 16 || sql_num == 21) {
- noFallBack = false
- }
-
- if (dt._2.contains(sql_num)) {
- compareResult = false
- }
-
+ |
Q$sql_num[$PrecisionLoss,$native,$compare]""".stripMargin) {
spark.sql(s"use
decimal_${decimalType.precision}_${decimalType.scale}")
withSQLConf(
(SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
allowPrecisionLoss)) {
@@ -363,7 +358,7 @@ class GlutenClickHouseDecimalSuite
sql_num,
tpchQueries,
compareResult = compareResult,
- noFallBack = noFallBack) { _ => {} }
+ noFallBack = !fallBack) { _ => {} }
}
spark.sql(s"use default")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
index 4e190c087..8599b3002 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
@@ -1051,8 +1051,12 @@ class GlutenClickHouseHiveTableSuite
spark.sql(
s"CREATE FUNCTION my_add as " +
s"'org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2' USING
JAR '$jarUrl'")
- runQueryAndCompare("select MY_ADD(id, id+1) from range(10)")(
- checkGlutenOperatorMatch[ProjectExecTransformer])
+ if (isSparkVersionLE("3.3")) {
+ runQueryAndCompare("select MY_ADD(id, id+1) from range(10)")(
+ checkGlutenOperatorMatch[ProjectExecTransformer])
+ } else {
+ runQueryAndCompare("select MY_ADD(id, id+1) from range(10)", noFallBack
= false)(_ => {})
+ }
}
test("GLUTEN-4333: fix CSE in aggregate operator") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
index 578c43292..0f642dfa8 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
@@ -603,7 +603,7 @@ class GlutenClickHouseNativeWriteTableSuite
("timestamp_field", "timestamp")
)
def excludeTimeFieldForORC(format: String): Seq[String] = {
- if (format.equals("orc") && isSparkVersionGE("3.4")) {
+ if (format.equals("orc") && isSparkVersionGE("3.5")) {
// FIXME:https://github.com/apache/incubator-gluten/pull/6507
fields.keys.filterNot(_.equals("timestamp_field")).toSeq
} else {
@@ -913,7 +913,7 @@ class GlutenClickHouseNativeWriteTableSuite
(table_name, create_sql, insert_sql)
},
(table_name, _) =>
- if (isSparkVersionGE("3.4")) {
+ if (isSparkVersionGE("3.5")) {
compareResultsAgainstVanillaSpark(
s"select * from $table_name",
compareResult = true,
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
index 9787182ed..6ca587beb 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.benchmarks.GenTPCDSTableScripts
-import org.apache.gluten.utils.UTSystemParameters
+import org.apache.gluten.utils.{Arm, UTSystemParameters}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
@@ -46,8 +46,8 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
rootPath +
"../../../../gluten-core/src/test/resources/tpcds-queries/tpcds.queries.original"
protected val queriesResults: String = rootPath +
"tpcds-decimal-queries-output"
- /** Return values: (sql num, is fall back, skip fall back assert) */
- def tpcdsAllQueries(isAqe: Boolean): Seq[(String, Boolean, Boolean)] =
+ /** Return values: (sql num, is fall back) */
+ def tpcdsAllQueries(isAqe: Boolean): Seq[(String, Boolean)] =
Range
.inclusive(1, 99)
.flatMap(
@@ -57,25 +57,24 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
} else {
Seq("q" + "%d".format(queryNum))
}
- val noFallBack = queryNum match {
- case i if !isAqe && (i == 10 || i == 16 || i == 35 || i == 94) =>
- // q10 smj + existence join
- // q16 smj + left semi + not condition
- // q35 smj + existence join
- // Q94 BroadcastHashJoin, LeftSemi, NOT condition
- (false, false)
- case i if isAqe && (i == 16 || i == 94) =>
- (false, false)
- case other => (true, false)
- }
- sqlNums.map((_, noFallBack._1, noFallBack._2))
+ val native = !fallbackSets(isAqe).contains(queryNum)
+ sqlNums.map((_, native))
})
- // FIXME "q17", stddev_samp inconsistent results, CH return NaN, Spark
return null
+ protected def fallbackSets(isAqe: Boolean): Set[Int] = {
+ val more = if (isSparkVersionGE("3.5")) Set(44, 67, 70) else Set.empty[Int]
+
+ // q16 smj + left semi + not condition
+ // Q94 BroadcastHashJoin, LeftSemi, NOT condition
+ if (isAqe) {
+ Set(16, 94) | more
+ } else {
+ // q10, q35 smj + existence join
+ Set(10, 16, 35, 94) | more
+ }
+ }
protected def excludedTpcdsQueries: Set[String] = Set(
- "q61", // inconsistent results
- "q66", // inconsistent results
- "q67" // inconsistent results
+ "q66" // inconsistent results
)
def executeTPCDSTest(isAqe: Boolean): Unit = {
@@ -83,11 +82,12 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
s =>
if (excludedTpcdsQueries.contains(s._1)) {
ignore(s"TPCDS ${s._1.toUpperCase()}") {
- runTPCDSQuery(s._1, noFallBack = s._2, skipFallBackAssert = s._3)
{ df => }
+ runTPCDSQuery(s._1, noFallBack = s._2) { df => }
}
} else {
- test(s"TPCDS ${s._1.toUpperCase()}") {
- runTPCDSQuery(s._1, noFallBack = s._2, skipFallBackAssert = s._3)
{ df => }
+ val tag = if (s._2) "Native" else "Fallback"
+ test(s"TPCDS[$tag] ${s._1.toUpperCase()}") {
+ runTPCDSQuery(s._1, noFallBack = s._2) { df => }
}
})
}
@@ -152,7 +152,7 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
}
override protected def afterAll(): Unit = {
- ClickhouseSnapshot.clearAllFileStatusCache
+ ClickhouseSnapshot.clearAllFileStatusCache()
DeltaLog.clearCache()
try {
@@ -183,11 +183,10 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
tpcdsQueries: String = tpcdsQueries,
queriesResults: String = queriesResults,
compareResult: Boolean = true,
- noFallBack: Boolean = true,
- skipFallBackAssert: Boolean = false)(customCheck: DataFrame => Unit):
Unit = {
+ noFallBack: Boolean = true)(customCheck: DataFrame => Unit): Unit = {
val sqlFile = tpcdsQueries + "/" + queryNum + ".sql"
- val sql = Source.fromFile(new File(sqlFile), "UTF-8").mkString
+ val sql = Arm.withResource(Source.fromFile(new File(sqlFile),
"UTF-8"))(_.mkString)
val df = spark.sql(sql)
if (compareResult) {
@@ -212,13 +211,13 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
// using WARN to guarantee printed
log.warn(s"query: $queryNum, finish comparing with saved result")
} else {
- val start = System.currentTimeMillis();
+ val start = System.currentTimeMillis()
val ret = df.collect()
// using WARN to guarantee printed
log.warn(s"query: $queryNum skipped comparing, time cost to collect:
${System
.currentTimeMillis() - start} ms, ret size: ${ret.length}")
}
- WholeStageTransformerSuite.checkFallBack(df, noFallBack,
skipFallBackAssert)
+ WholeStageTransformerSuite.checkFallBack(df, noFallBack)
customCheck(df)
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
index 59912e722..e05cf7274 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -234,10 +234,10 @@ class GlutenClickHouseTPCHBucketSuite
val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
- assert(plans(0).metrics("numFiles").value === 2)
- assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("numOutputRows").value === 591673)
+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
+ assert(plans.head.metrics("numFiles").value === 2)
+ assert(plans.head.metrics("pruningTime").value ===
pruningTimeValueSpark)
+ assert(plans.head.metrics("numOutputRows").value === 591673)
})
}
@@ -291,7 +291,7 @@ class GlutenClickHouseTPCHBucketSuite
}
if (sparkVersion.equals("3.2")) {
-
assert(!(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
+
assert(!plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
} else {
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
@@ -327,14 +327,14 @@ class GlutenClickHouseTPCHBucketSuite
.isInstanceOf[InputIteratorTransformer])
if (sparkVersion.equals("3.2")) {
-
assert(!(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
+
assert(!plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
} else {
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
assert(plans(2).metrics("numFiles").value === 2)
assert(plans(2).metrics("numOutputRows").value === 3111)
-
assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
+
assert(!plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(3).metrics("numFiles").value === 2)
assert(plans(3).metrics("numOutputRows").value === 72678)
})
@@ -366,12 +366,12 @@ class GlutenClickHouseTPCHBucketSuite
}
// bucket join
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.left
.isInstanceOf[ProjectExecTransformer])
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.right
.isInstanceOf[ProjectExecTransformer])
@@ -409,10 +409,10 @@ class GlutenClickHouseTPCHBucketSuite
val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
- assert(plans(0).metrics("numFiles").value === 2)
- assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("numOutputRows").value === 11618)
+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
+ assert(plans.head.metrics("numFiles").value === 2)
+ assert(plans.head.metrics("pruningTime").value ===
pruningTimeValueSpark)
+ assert(plans.head.metrics("numOutputRows").value === 11618)
})
}
@@ -425,12 +425,12 @@ class GlutenClickHouseTPCHBucketSuite
}
// bucket join
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.left
.isInstanceOf[FilterExecTransformerBase])
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.right
.isInstanceOf[ProjectExecTransformer])
@@ -585,7 +585,7 @@ class GlutenClickHouseTPCHBucketSuite
def checkResult(df: DataFrame, exceptedResult: Seq[Row]): Unit = {
// check the result
val result = df.collect()
- assert(result.size == exceptedResult.size)
+ assert(result.length == exceptedResult.size)
val sortedRes = result.map {
s =>
Row.fromSeq(s.toSeq.map {
@@ -786,7 +786,7 @@ class GlutenClickHouseTPCHBucketSuite
|order by l_orderkey, l_returnflag, t
|limit 10
|""".stripMargin
- runSql(SQL7, false)(
+ runSql(SQL7, noFallBack = false)(
df => {
checkResult(
df,
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index 9412326ae..497286115 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.commons.io.FileUtils
+import org.scalatest.Tag
import java.io.File
@@ -177,13 +178,23 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
super.beforeAll()
}
- protected val rootPath = this.getClass.getResource("/").getPath
- protected val basePath = rootPath + "tests-working-home"
- protected val warehouse = basePath + "/spark-warehouse"
- protected val metaStorePathAbsolute = basePath + "/meta"
- protected val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
+ protected val rootPath: String = this.getClass.getResource("/").getPath
+ protected val basePath: String = rootPath + "tests-working-home"
+ protected val warehouse: String = basePath + "/spark-warehouse"
+ protected val metaStorePathAbsolute: String = basePath + "/meta"
+ protected val hiveMetaStoreDB: String = metaStorePathAbsolute +
"/metastore_db"
final override protected val resourcePath: String = "" // ch not need this
override protected val fileFormat: String = "parquet"
+
+ protected def testSparkVersionLE33(testName: String, testTag: Tag*)(testFun:
=> Any): Unit = {
+ if (isSparkVersionLE("3.3")) {
+ test(testName, testTag: _*)(testFun)
+ } else {
+ ignore(s"[$SPARK_VERSION_SHORT]-$testName", testTag: _*)(testFun)
+ }
+ }
+
+ lazy val pruningTimeValueSpark: Int = if (isSparkVersionLE("3.3")) -1 else 0
}
// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
index 5887050d0..28ff5874f 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
@@ -105,9 +105,9 @@ class GlutenClickhouseCountDistinctSuite extends
GlutenClickHouseWholeStageTrans
val sql = s"""
select count(distinct(a,b)) , try_add(c,b) from
values (0, null,1), (0,null,2), (1, 1,4) as data(a,b,c) group by
try_add(c,b)
- """;
+ """
val df = spark.sql(sql)
- WholeStageTransformerSuite.checkFallBack(df, noFallback = false)
+ WholeStageTransformerSuite.checkFallBack(df, noFallback =
isSparkVersionGE("3.5"))
}
test("check count distinct with filter") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index 4b5a5b328..509967125 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.TaskResources
import scala.collection.JavaConverters._
class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite {
- private val parquetMaxBlockSize = 4096;
+ private val parquetMaxBlockSize = 4096
override protected val needCopyParquetToTablePath = true
override protected val tablesPath: String = basePath + "/tpch-data"
@@ -71,15 +71,15 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(plans.size == 3)
assert(plans(2).metrics("numFiles").value === 1)
- assert(plans(2).metrics("pruningTime").value === -1)
+ assert(plans(2).metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans(2).metrics("filesSize").value === 19230111)
assert(plans(1).metrics("numOutputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)
// Execute Sort operator, it will read the data twice.
- assert(plans(0).metrics("numOutputRows").value === 4)
- assert(plans(0).metrics("outputVectors").value === 1)
+ assert(plans.head.metrics("numOutputRows").value === 4)
+ assert(plans.head.metrics("outputVectors").value === 1)
}
}
@@ -139,15 +139,15 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(plans.size == 3)
assert(plans(2).metrics("numFiles").value === 1)
- assert(plans(2).metrics("pruningTime").value === -1)
+ assert(plans(2).metrics("pruningTime").value ===
pruningTimeValueSpark)
assert(plans(2).metrics("filesSize").value === 19230111)
assert(plans(1).metrics("numOutputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)
// Execute Sort operator, it will read the data twice.
- assert(plans(0).metrics("numOutputRows").value === 4)
- assert(plans(0).metrics("outputVectors").value === 1)
+ assert(plans.head.metrics("numOutputRows").value === 4)
+ assert(plans.head.metrics("outputVectors").value === 1)
}
}
}
@@ -165,7 +165,7 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
)
assert(nativeMetricsList.size == 1)
- val nativeMetricsData = nativeMetricsList(0)
+ val nativeMetricsData = nativeMetricsList.head
assert(nativeMetricsData.metricsDataList.size() == 3)
assert(nativeMetricsData.metricsDataList.get(0).getName.equals("kRead"))
@@ -287,7 +287,7 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
assert(joinPlan.metrics("inputBytes").value == 1920000)
}
- val wholeStageTransformer2 = allWholeStageTransformers(0)
+ val wholeStageTransformer2 = allWholeStageTransformers.head
GlutenClickHouseMetricsUTUtils.executeMetricsUpdater(
wholeStageTransformer2,
@@ -325,7 +325,7 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
)
assert(nativeMetricsList.size == 1)
- val nativeMetricsData = nativeMetricsList(0)
+ val nativeMetricsData = nativeMetricsList.head
assert(nativeMetricsData.metricsDataList.size() == 5)
assert(nativeMetricsData.metricsDataList.get(0).getName.equals("kRead"))
@@ -399,7 +399,7 @@ class GlutenClickHouseTPCHMetricsSuite extends
GlutenClickHouseTPCHAbstractSuite
)
assert(nativeMetricsListFinal.size == 1)
- val nativeMetricsDataFinal = nativeMetricsListFinal(0)
+ val nativeMetricsDataFinal = nativeMetricsListFinal.head
assert(nativeMetricsDataFinal.metricsDataList.size() == 3)
assert(nativeMetricsDataFinal.metricsDataList.get(0).getName.equals("kRead"))
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
index a1b5801da..b4e4cea91 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
@@ -460,26 +460,34 @@ class GlutenParquetFilterSuite
"orders1" -> Nil)
)
+ def runTest(i: Int): Unit = withDataFrame(tpchSQL(i + 1,
tpchQueriesResourceFolder)) {
+ df =>
+ val scans = df.queryExecution.executedPlan
+ .collect { case scan: FileSourceScanExecTransformer => scan }
+ assertResult(result(i).size)(scans.size)
+ scans.zipWithIndex
+ .foreach {
+ case (scan, fileIndex) =>
+ val tableName = scan.tableIdentifier
+ .map(_.table)
+ .getOrElse(scan.relation.options("path").split("/").last)
+ val predicates = scan.filterExprs()
+ val expected = result(i)(s"$tableName$fileIndex")
+ assertResult(expected.size)(predicates.size)
+ if (expected.isEmpty) assert(predicates.isEmpty)
+ else compareExpressions(expected.reduceLeft(And),
predicates.reduceLeft(And))
+ }
+ }
+
tpchQueries.zipWithIndex.foreach {
case (q, i) =>
- test(q) {
- withDataFrame(tpchSQL(i + 1, tpchQueriesResourceFolder)) {
- df =>
- val scans = df.queryExecution.executedPlan
- .collect { case scan: FileSourceScanExecTransformer => scan }
- assertResult(result(i).size)(scans.size)
- scans.zipWithIndex
- .foreach {
- case (scan, fileIndex) =>
- val tableName = scan.tableIdentifier
- .map(_.table)
- .getOrElse(scan.relation.options("path").split("/").last)
- val predicates = scan.filterExprs()
- val expected = result(i)(s"$tableName$fileIndex")
- assertResult(expected.size)(predicates.size)
- if (expected.isEmpty) assert(predicates.isEmpty)
- else compareExpressions(expected.reduceLeft(And),
predicates.reduceLeft(And))
- }
+ if (q == "q2" || q == "q9") {
+ testSparkVersionLE33(q) {
+ runTest(i)
+ }
+ } else {
+ test(q) {
+ runTest(i)
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
similarity index 90%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetAQESuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
index 1960e3002..389d617f1 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpcds
+
+import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
@@ -46,7 +48,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
val result = runSql("""
|select count(c_customer_sk) from customer
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 100000L)
+ assertResult(100000L)(result.head.getLong(0))
}
test("test reading from partitioned table") {
@@ -55,7 +57,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
| from store_sales
| where ss_quantity between 1 and 20
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 550458L)
+ assertResult(550458L)(result.head.getLong(0))
}
test("test reading from partitioned table with partition column filter") {
@@ -66,7 +68,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
| where ss_quantity between 1 and 20
| and ss_sold_date_sk = 2452635
|""".stripMargin,
- true,
+ compareResult = true,
_ => {}
)
}
@@ -76,8 +78,8 @@ class GlutenClickHouseTPCDSParquetAQESuite
|select avg(cs_item_sk), avg(cs_order_number)
| from catalog_sales
|""".stripMargin) { _ => }
- assert(result(0).getDouble(0) == 8998.463336886734)
- assert(result(0).getDouble(1) == 80037.12727449503)
+ assertResult(8998.463336886734)(result.head.getDouble(0))
+ assertResult(80037.12727449503)(result.head.getDouble(1))
}
test("Gluten-1235: Fix missing reading from the broadcasted value when
executing DPP") {
@@ -96,7 +98,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
@@ -107,11 +109,11 @@ class GlutenClickHouseTPCDSParquetAQESuite
.asInstanceOf[FileSourceScanExecTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
- assert(
+ assertResult(1823)(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.selectedPartitions
- .size == 1823)
+ .length)
}
)
}
@@ -126,7 +128,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
}
// On Spark 3.2, there are 15 AdaptiveSparkPlanExec,
// and on Spark 3.3, there are 5 AdaptiveSparkPlanExec and 10
ReusedSubqueryExec
- assert(subqueryAdaptiveSparkPlan.filter(_ == true).size == 15)
+ assertResult(15)(subqueryAdaptiveSparkPlan.count(_ == true))
}
}
@@ -141,12 +143,12 @@ class GlutenClickHouseTPCDSParquetAQESuite
} =>
f
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reusedExchangeExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
case r: ReusedExchangeExec => r
}
- assert(reusedExchangeExec.nonEmpty == true)
+ assert(reusedExchangeExec.nonEmpty)
}
}
@@ -164,7 +166,7 @@ class GlutenClickHouseTPCDSParquetAQESuite
} =>
f
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reusedExchangeExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
case r: ReusedExchangeExec => r
@@ -194,6 +196,6 @@ class GlutenClickHouseTPCDSParquetAQESuite
|ORDER BY channel
| LIMIT 100 ;
|""".stripMargin
- compareResultsAgainstVanillaSpark(testSql, true, df => {})
+ compareResultsAgainstVanillaSpark(testSql, compareResult = true, df => {})
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
similarity index 93%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
index 66f1adfb6..1fd8983f5 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpcds
+
+import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
@@ -48,7 +50,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
| from store_sales
| where ss_quantity between 1 and 20
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 550458L)
+ assertResult(550458L)(result.head.getLong(0))
}
test("test reading from partitioned table with partition column filter") {
@@ -59,7 +61,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
| where ss_quantity between 1 and 20
| and ss_sold_date_sk = 2452635
|""".stripMargin,
- true,
+ compareResult = true,
_ => {}
)
}
@@ -69,8 +71,8 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
|select avg(cs_item_sk), avg(cs_order_number)
| from catalog_sales
|""".stripMargin) { _ => }
- assert(result(0).getDouble(0) == 8998.463336886734)
- assert(result(0).getDouble(1) == 80037.12727449503)
+ assertResult(8998.463336886734)(result.head.getDouble(0))
+ assertResult(80037.12727449503)(result.head.getDouble(1))
}
test("Gluten-1235: Fix missing reading from the broadcasted value when
executing DPP") {
@@ -89,7 +91,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val foundDynamicPruningExpr = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
@@ -100,11 +102,11 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
.asInstanceOf[FileSourceScanExecTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
- assert(
+ assertResult(1823)(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.selectedPartitions
- .size == 1823)
+ .length)
}
)
}
@@ -119,7 +121,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
}
// On Spark 3.2, there are 15 AdaptiveSparkPlanExec,
// and on Spark 3.3, there are 5 AdaptiveSparkPlanExec and 10
ReusedSubqueryExec
- assert(subqueryAdaptiveSparkPlan.filter(_ == true).size == 15)
+ assertResult(15)(subqueryAdaptiveSparkPlan.count(_ == true))
}
}
@@ -145,12 +147,12 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
} =>
f
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reusedExchangeExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
case r: ReusedExchangeExec => r
}
- assert(reusedExchangeExec.nonEmpty == true)
+ assert(reusedExchangeExec.nonEmpty)
}
}
@@ -168,7 +170,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
} =>
f
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reusedExchangeExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
case r: ReusedExchangeExec => r
@@ -198,7 +200,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
|ORDER BY channel
| LIMIT 100 ;
|""".stripMargin
- compareResultsAgainstVanillaSpark(testSql, true, df => {})
+ compareResultsAgainstVanillaSpark(testSql, compareResult = true, df => {})
}
test("GLUTEN-1620: fix 'attribute binding failed.' when executing hash agg
with aqe") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
similarity index 91%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
index ca3db0772..4675de249 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpcds
+
+import org.apache.gluten.execution.{FileSourceScanExecTransformer,
GlutenClickHouseTPCDSAbstractSuite}
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
@@ -45,7 +47,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
| from store_sales
| where ss_quantity between 1 and 20
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 550458L)
+ assertResult(550458L)(result.head.getLong(0))
}
test("test reading from partitioned table with partition column filter") {
@@ -56,7 +58,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
| where ss_quantity between 1 and 20
| and ss_sold_date_sk = 2452635
|""".stripMargin,
- true,
+ compareResult = true,
_ => {}
)
}
@@ -66,8 +68,8 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
|select avg(cs_item_sk), avg(cs_order_number)
| from catalog_sales
|""".stripMargin) { _ => }
- assert(result(0).getDouble(0) == 8998.463336886734)
- assert(result(0).getDouble(1) == 80037.12727449503)
+ assertResult(8998.463336886734)(result.head.getDouble(0))
+ assertResult(80037.12727449503)(result.head.getDouble(1))
}
test("Gluten-1235: Fix missing reading from the broadcasted value when
executing DPP") {
@@ -86,7 +88,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
|""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val foundDynamicPruningExpr = df.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
@@ -97,11 +99,11 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
.asInstanceOf[FileSourceScanExecTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
- assert(
+ assertResult(1823)(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.selectedPartitions
- .size == 1823)
+ .length)
}
)
}
@@ -144,13 +146,13 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
}
case _ => false
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reuseExchange = df.queryExecution.executedPlan.find {
case r: ReusedExchangeExec => true
case _ => false
}
- assert(reuseExchange.nonEmpty == true)
+ assert(reuseExchange.nonEmpty)
}
}
@@ -168,7 +170,7 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
}
case _ => false
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reuseExchange = df.queryExecution.executedPlan.find {
case r: ReusedExchangeExec => true
@@ -199,6 +201,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
|ORDER BY channel
| LIMIT 100 ;
|""".stripMargin
- compareResultsAgainstVanillaSpark(testSql, true, df => {})
+ compareResultsAgainstVanillaSpark(testSql, compareResult = true, df => {})
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
similarity index 93%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
index a7b3518cc..716ea5761 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpcds
+
+import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
@@ -34,7 +36,7 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite extends
GlutenClickHouseTPC
.set("spark.gluten.sql.columnar.backend.ch.runtime_settings.max_bytes_in_join",
"314572800")
}
- executeTPCDSTest(false);
+ executeTPCDSTest(false)
test("Gluten-1235: Fix missing reading from the broadcasted value when
executing DPP") {
val testSql =
@@ -52,7 +54,7 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite extends
GlutenClickHouseTPC
|""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val foundDynamicPruningExpr = df.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
@@ -63,11 +65,11 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite
extends GlutenClickHouseTPC
.asInstanceOf[FileSourceScanExecTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
- assert(
+ assertResult(1823)(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.selectedPartitions
- .size == 1823)
+ .length)
}
)
}
@@ -86,7 +88,7 @@ class GlutenClickHouseTPCDSParquetGraceHashJoinSuite extends
GlutenClickHouseTPC
}
case _ => false
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reuseExchange = df.queryExecution.executedPlan.find {
case r: ReusedExchangeExec => true
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetRFSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetRFSuite.scala
similarity index 93%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetRFSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetRFSuite.scala
index 27137c6d9..657a6e321 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetRFSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetRFSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpcds
+
+import org.apache.gluten.execution.GlutenClickHouseTPCDSAbstractSuite
import org.apache.spark.SparkConf
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
similarity index 93%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
index 3ec4e31a4..7e480361b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
@@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpcds
+import org.apache.gluten.execution.{CHSortMergeJoinExecTransformer,
GlutenClickHouseTPCDSAbstractSuite}
import org.apache.gluten.test.FallbackUtil
import org.apache.spark.SparkConf
@@ -64,7 +65,7 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite extends
GlutenClickHouseTPC
|i.i_current_price > 1.0 """.stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val smjTransformers = df.queryExecution.executedPlan.collect {
case f: CHSortMergeJoinExecTransformer => f
@@ -83,7 +84,7 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite extends
GlutenClickHouseTPC
""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val smjTransformers = df.queryExecution.executedPlan.collect {
case f: CHSortMergeJoinExecTransformer => f
@@ -102,7 +103,7 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite
extends GlutenClickHouseTPC
""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val smjTransformers = df.queryExecution.executedPlan.collect {
case f: CHSortMergeJoinExecTransformer => f
@@ -124,7 +125,7 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite
extends GlutenClickHouseTPC
val smjTransformers = df.queryExecution.executedPlan.collect {
case f: CHSortMergeJoinExecTransformer => f
}
- assert(smjTransformers.size == 0)
+ assert(smjTransformers.isEmpty)
assert(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
}
}
@@ -140,18 +141,18 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite
extends GlutenClickHouseTPC
val smjTransformers = df.queryExecution.executedPlan.collect {
case f: CHSortMergeJoinExecTransformer => f
}
- assert(smjTransformers.size == 0)
+ assert(smjTransformers.isEmpty)
assert(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
}
}
- val createItem =
+ val createItem: String =
"""CREATE TABLE myitem (
| i_current_price DECIMAL(7,2),
| i_category STRING)
|USING parquet""".stripMargin
- val insertItem =
+ val insertItem: String =
"""insert into myitem values
|(null,null),
|(null,null),
@@ -174,7 +175,7 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite
extends GlutenClickHouseTPC
""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val smjTransformers = df.queryExecution.executedPlan.collect {
case f: CHSortMergeJoinExecTransformer => f
@@ -206,7 +207,7 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite
extends GlutenClickHouseTPC
spark.sql(testSql).show()
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val smjTransformers = df.queryExecution.executedPlan.collect {
case f: CHSortMergeJoinExecTransformer => f
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
similarity index 88%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
index e20ea35e5..d0b270d2a 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpcds
+
+import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
@@ -47,7 +49,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
val result = runSql("""
|select count(c_customer_sk) from customer
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 100000L)
+ assertResult(100000L)(result.head.getLong(0))
}
test("test reading from partitioned table") {
@@ -56,7 +58,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
| from store_sales
| where ss_quantity between 1 and 20
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 550458L)
+ assertResult(550458L)(result.head.getLong(0))
}
test("test reading from partitioned table with partition column filter") {
@@ -67,7 +69,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
| where ss_quantity between 1 and 20
| and ss_sold_date_sk = 2452635
|""".stripMargin,
- true,
+ compareResult = true,
_ => {}
)
}
@@ -77,8 +79,8 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|select avg(cs_item_sk), avg(cs_order_number)
| from catalog_sales
|""".stripMargin) { _ => }
- assert(result(0).getDouble(0) == 8998.463336886734)
- assert(result(0).getDouble(1) == 80037.12727449503)
+ assertResult(8998.463336886734)(result.head.getDouble(0))
+ assertResult(80037.12727449503)(result.head.getDouble(1))
}
test("test union all operator with two tables") {
@@ -89,7 +91,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
| select ws_sold_date_sk as date_sk from web_sales
|)
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 791809)
+ assertResult(791809)(result.head.getLong(0))
}
test("test union all operator with three tables") {
@@ -103,7 +105,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
| )
|)
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 791909)
+ assertResult(791909)(result.head.getLong(0))
}
test("test union operator with two tables") {
@@ -114,7 +116,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
| select ws_sold_date_sk as date_sk from web_sales
|)
|""".stripMargin) { _ => }
- assert(result(0).getLong(0) == 73049)
+ assertResult(73049)(result.head.getLong(0))
}
test("Test join with mixed condition 1") {
@@ -134,7 +136,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
| ORDER BY ext_price DESC, i_brand, i_brand_id, i_manufact_id,
i_manufact
| LIMIT 100;
|""".stripMargin
- compareResultsAgainstVanillaSpark(testSql, true, _ => {})
+ compareResultsAgainstVanillaSpark(testSql, compareResult = true, _ => {})
}
test("Gluten-1235: Fix missing reading from the broadcasted value when
executing DPP") {
@@ -153,7 +155,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|""".stripMargin
compareResultsAgainstVanillaSpark(
testSql,
- true,
+ compareResult = true,
df => {
val foundDynamicPruningExpr = df.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
@@ -164,11 +166,11 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
.asInstanceOf[FileSourceScanExecTransformer]
.partitionFilters
.exists(_.isInstanceOf[DynamicPruningExpression]))
- assert(
+ assertResult(1823)(
foundDynamicPruningExpr(1)
.asInstanceOf[FileSourceScanExecTransformer]
.selectedPartitions
- .size == 1823)
+ .length)
}
)
}
@@ -200,13 +202,13 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
}
case _ => false
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reuseExchange = df.queryExecution.executedPlan.find {
case r: ReusedExchangeExec => true
case _ => false
}
- assert(reuseExchange.nonEmpty == true)
+ assert(reuseExchange.nonEmpty)
}
}
@@ -224,7 +226,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
}
case _ => false
}
- assert(foundDynamicPruningExpr.nonEmpty == true)
+ assert(foundDynamicPruningExpr.nonEmpty)
val reuseExchange = df.queryExecution.executedPlan.find {
case r: ReusedExchangeExec => true
@@ -255,7 +257,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|ORDER BY channel
| LIMIT 100 ;
|""".stripMargin
- compareResultsAgainstVanillaSpark(testSql, true, df => {})
+ compareResultsAgainstVanillaSpark(testSql, compareResult = true, df => {})
}
test("Bug-382 collec_list failure") {
@@ -264,7 +266,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|select cc_call_center_id, collect_list(cc_call_center_sk) from
call_center group by cc_call_center_id
|order by cc_call_center_id
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, df => {})
+ compareResultsAgainstVanillaSpark(sql, compareResult = true, df => {})
}
test("collec_set") {
@@ -275,7 +277,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|lateral view explode(set) as b
|order by a, b
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, _ => {})
+ compareResultsAgainstVanillaSpark(sql, compareResult = true, _ => {})
}
test("GLUTEN-1626: test 'roundHalfup'") {
@@ -286,7 +288,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|from store_sales
|group by a order by a
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql0, true, _ => {})
+ compareResultsAgainstVanillaSpark(sql0, compareResult = true, _ => {})
val sql1 =
"""
@@ -295,7 +297,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|from store_sales
|group by a order by a
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql1, true, _ => {})
+ compareResultsAgainstVanillaSpark(sql1, compareResult = true, _ => {})
val sql2 =
"""
@@ -304,7 +306,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|from catalog_sales
|group by a order by a
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql2, true, _ => {})
+ compareResultsAgainstVanillaSpark(sql2, compareResult = true, _ => {})
val sql3 =
"""
@@ -313,7 +315,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|from catalog_sales
|group by a order by a
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql3, true, _ => {})
+ compareResultsAgainstVanillaSpark(sql3, compareResult = true, _ => {})
val sql4 =
"""
@@ -322,7 +324,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|from web_sales
|group by a order by a
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql4, true, _ => {})
+ compareResultsAgainstVanillaSpark(sql4, compareResult = true, _ => {})
val sql5 =
"""
@@ -331,7 +333,7 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
|from web_sales
|group by a order by a
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql5, true, _ => {})
+ compareResultsAgainstVanillaSpark(sql5, compareResult = true, _ => {})
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDatetimeExpressionSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseDatetimeExpressionSuite.scala
similarity index 98%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDatetimeExpressionSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseDatetimeExpressionSuite.scala
index a1749efb1..b3196286e 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDatetimeExpressionSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseDatetimeExpressionSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
similarity index 94%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index 6caac9918..c2e2f9f55 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpch
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.execution._
import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.SparkConf
@@ -65,7 +66,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans.size == 5)
assert(plans(4).metrics("numFiles").value === 1)
- assert(plans(4).metrics("pruningTime").value === -1)
+ assert(plans(4).metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans(4).metrics("filesSize").value === 19230111)
assert(plans(4).metrics("numOutputRows").value === 600572)
@@ -80,8 +81,8 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(1).metrics("numOutputRows").value === 8)
assert(plans(1).metrics("outputVectors").value === 2)
- assert(plans(0).metrics("numInputRows").value === 4)
- assert(plans(0).metrics("numOutputRows").value === 4)
+ assert(plans.head.metrics("numInputRows").value === 4)
+ assert(plans.head.metrics("numOutputRows").value === 4)
}
}
@@ -97,7 +98,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans.size == 3)
assert(plans(2).metrics("numFiles").value === 1)
- assert(plans(2).metrics("pruningTime").value === -1)
+ assert(plans(2).metrics("pruningTime").value ===
pruningTimeValueSpark)
assert(plans(2).metrics("filesSize").value === 19230111)
assert(plans(1).metrics("numInputRows").value === 591673)
@@ -105,8 +106,8 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(1).metrics("outputVectors").value === 1)
// Execute Sort operator, it will read the data twice.
- assert(plans(0).metrics("numOutputRows").value === 8)
- assert(plans(0).metrics("outputVectors").value === 2)
+ assert(plans.head.metrics("numOutputRows").value === 8)
+ assert(plans.head.metrics("outputVectors").value === 2)
}
}
}
@@ -147,8 +148,8 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(inputIteratorTransformers(1).metrics("numInputRows").value
=== 3111)
assert(inputIteratorTransformers(1).metrics("numOutputRows").value
=== 3111)
- assert(inputIteratorTransformers(0).metrics("numInputRows").value
=== 15224)
- assert(inputIteratorTransformers(0).metrics("numOutputRows").value
=== 15224)
+ assert(inputIteratorTransformers.head.metrics("numInputRows").value
=== 15224)
+ assert(inputIteratorTransformers.head.metrics("numOutputRows").value
=== 15224)
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala
similarity index 96%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala
index 9f4befbb0..8c706f683 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQEConcurrentSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
similarity index 98%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetAQESuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
index c3e64a941..1d8389b48 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
@@ -345,9 +347,7 @@ class GlutenClickHouseTPCHParquetAQESuite
|order by t1.l_orderkey, t2.o_orderkey, t2.o_year, t1.l_cnt,
t2.o_cnt
|limit 100
|
- |""".stripMargin,
- true,
- true
+ |""".stripMargin
)(df => {})
runQueryAndCompare(
@@ -366,10 +366,7 @@ class GlutenClickHouseTPCHParquetAQESuite
|order by t1.l_orderkey, t2.o_orderkey, t2.o_year
|limit 100
|
- |""".stripMargin,
- true,
- true
- )(df => {})
+ |""".stripMargin)(df => {})
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
similarity index 95%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
index c164fae70..614e0124b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
@@ -259,10 +261,10 @@ class GlutenClickHouseTPCHParquetBucketSuite
val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
- assert(plans(0).metrics("numFiles").value === 4)
- assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("numOutputRows").value === 600572)
+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
+ assert(plans.head.metrics("numFiles").value === 4)
+ assert(plans.head.metrics("pruningTime").value ===
pruningTimeValueSpark)
+ assert(plans.head.metrics("numOutputRows").value === 600572)
}
)
}
@@ -319,7 +321,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
}
if (sparkVersion.equals("3.2")) {
-
assert(!(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
+
assert(!plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
} else {
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
@@ -359,14 +361,14 @@ class GlutenClickHouseTPCHParquetBucketSuite
.isInstanceOf[InputIteratorTransformer])
if (sparkVersion.equals("3.2")) {
-
assert(!(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
+
assert(!plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
} else {
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
assert(plans(2).metrics("numFiles").value === 4)
assert(plans(2).metrics("numOutputRows").value === 15000)
-
assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
+
assert(!plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(3).metrics("numFiles").value === 4)
assert(plans(3).metrics("numOutputRows").value === 150000)
}
@@ -404,12 +406,12 @@ class GlutenClickHouseTPCHParquetBucketSuite
}
// bucket join
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.left
.isInstanceOf[ProjectExecTransformer])
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.right
.isInstanceOf[ProjectExecTransformer])
@@ -453,10 +455,10 @@ class GlutenClickHouseTPCHParquetBucketSuite
val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
-
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
- assert(plans(0).metrics("numFiles").value === 4)
- assert(plans(0).metrics("pruningTime").value === -1)
- assert(plans(0).metrics("numOutputRows").value === 600572)
+
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
+ assert(plans.head.metrics("numFiles").value === 4)
+ assert(plans.head.metrics("pruningTime").value ===
pruningTimeValueSpark)
+ assert(plans.head.metrics("numOutputRows").value === 600572)
}
)
}
@@ -472,12 +474,12 @@ class GlutenClickHouseTPCHParquetBucketSuite
}
// bucket join
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.left
.isInstanceOf[FilterExecTransformerBase])
assert(
- plans(0)
+ plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.right
.isInstanceOf[ProjectExecTransformer])
@@ -654,7 +656,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL,
- true,
+ compareResult = true,
df => {}
)
}
@@ -675,7 +677,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL,
- true,
+ compareResult = true,
df => { checkHashAggregateCount(df, 1) }
)
@@ -690,7 +692,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL1,
- true,
+ compareResult = true,
df => { checkHashAggregateCount(df, 1) }
)
@@ -702,7 +704,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL2,
- true,
+ compareResult = true,
df => { checkHashAggregateCount(df, 1) }
)
@@ -716,7 +718,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL3,
- true,
+ compareResult = true,
df => { checkHashAggregateCount(df, 2) }
)
@@ -731,7 +733,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL4,
- true,
+ compareResult = true,
df => { checkHashAggregateCount(df, 4) }
)
@@ -745,7 +747,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL5,
- true,
+ compareResult = true,
df => { checkHashAggregateCount(df, 4) }
)
@@ -755,7 +757,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL6,
- true,
+ compareResult = true,
df => {
// there is a shuffle between two phase hash aggregate.
checkHashAggregateCount(df, 2)
@@ -773,7 +775,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL7,
- true,
+ compareResult = true,
df => {
checkHashAggregateCount(df, 1)
}
@@ -790,7 +792,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
|""".stripMargin
compareResultsAgainstVanillaSpark(
SQL,
- true,
+ compareResult = true,
df => {
checkHashAggregateCount(df, 0)
val plans = collect(df.queryExecution.executedPlan) { case agg:
SortAggregateExec => agg }
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetRFSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetRFSuite.scala
similarity index 91%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetRFSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetRFSuite.scala
index 83e847a70..eb4118689 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetRFSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetRFSuite.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution._
import org.apache.spark.SparkConf
@@ -60,7 +62,10 @@ class GlutenClickHouseTPCHParquetRFSuite extends
GlutenClickHouseTPCHSaltNullPar
}
assert(filterExecs.size == 4)
assert(
-
filterExecs(0).asInstanceOf[FilterExecTransformer].toString.contains("might_contain"))
+ filterExecs.head
+ .asInstanceOf[FilterExecTransformer]
+ .toString
+ .contains("might_contain"))
}
}
)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
similarity index 98%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 694a9f253..d90330436 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.tpch
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.execution._
import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.{SparkConf, SparkException}
@@ -41,7 +42,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
override protected val queriesResults: String = rootPath + "queries-output"
protected val BACKEND_CONF_KEY = "spark.gluten.sql.columnar.backend.ch."
- protected val BACKEND_RUNTIME_CINF_KEY = BACKEND_CONF_KEY + "runtime_config."
+ protected val BACKEND_RUNTIME_CINF_KEY: String = BACKEND_CONF_KEY +
"runtime_config."
override protected def sparkConf: SparkConf = {
super.sparkConf
@@ -205,7 +206,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
| show tables;
|""".stripMargin)
.collect()
- assert(result.size == 8)
+ assertResult(8)(result.length)
}
test("TPCH Q1") {
@@ -753,8 +754,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
runQueryAndCompare(query)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
- // see issue https://github.com/Kyligence/ClickHouse/issues/93
- ignore("TPCH Q22") {
+ test("TPCH Q22") {
runTPCHQuery(22) { df => }
}
@@ -1253,7 +1253,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
|select n_regionkey, collect_list(if(n_regionkey=0, n_name, null)) as
t from nation group by n_regionkey
|order by n_regionkey
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, df => {})
+ compareResultsAgainstVanillaSpark(sql, compareResult = true, df => {})
}
test("collect_set") {
@@ -1366,7 +1366,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
}
}
- test("test posexplode issue:
https://github.com/oap-project/gluten/issues/1767") {
+ testSparkVersionLE33("test posexplode issue:
https://github.com/oap-project/gluten/issues/1767") {
spark.sql("create table test_1767 (id bigint, data map<string, string>)
using parquet")
spark.sql("INSERT INTO test_1767 values(1, map('k', 'v'))")
@@ -1855,7 +1855,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
| ) t1
|) t2 where rank = 1
""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => })
+ compareResultsAgainstVanillaSpark(sql, true, { _ => },
isSparkVersionLE("3.3"))
}
test("GLUTEN-1874 not null in both streams") {
@@ -1873,7 +1873,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
| ) t1
|) t2 where rank = 1
""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => })
+ compareResultsAgainstVanillaSpark(sql, true, { _ => },
isSparkVersionLE("3.3"))
}
test("GLUTEN-2095: test cast(string as binary)") {
@@ -2158,12 +2158,12 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
}
test("GLUTEN-3149/GLUTEN-5580: Fix convert float to int") {
- val tbl_create_sql = "create table test_tbl_3149(a int, b bigint) using
parquet";
+ val tbl_create_sql = "create table test_tbl_3149(a int, b bigint) using
parquet"
val tbl_insert_sql = "insert into test_tbl_3149 values(1, 0), (2,
171396196666200)"
val select_sql_1 = "select cast(a * 1.0f/b as int) as x from test_tbl_3149
where a = 1"
val select_sql_2 = "select cast(b/100 as int) from test_tbl_3149 where a =
2"
spark.sql(tbl_create_sql)
- spark.sql(tbl_insert_sql);
+ spark.sql(tbl_insert_sql)
compareResultsAgainstVanillaSpark(select_sql_1, true, { _ => })
compareResultsAgainstVanillaSpark(select_sql_2, true, { _ => })
spark.sql("drop table test_tbl_3149")
@@ -2223,12 +2223,12 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
test("GLUTEN-3134: Bug fix left join not match") {
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "1B")) {
val left_tbl_create_sql =
- "create table test_tbl_left_3134(id bigint, name string) using
parquet";
+ "create table test_tbl_left_3134(id bigint, name string) using parquet"
val right_tbl_create_sql =
- "create table test_tbl_right_3134(id string, name string) using
parquet";
+ "create table test_tbl_right_3134(id string, name string) using
parquet"
val left_data_insert_sql =
- "insert into test_tbl_left_3134 values(2, 'a'), (3, 'b'), (673, 'c')";
- val right_data_insert_sql = "insert into test_tbl_right_3134
values('673', 'c')";
+ "insert into test_tbl_left_3134 values(2, 'a'), (3, 'b'), (673, 'c')"
+ val right_data_insert_sql = "insert into test_tbl_right_3134
values('673', 'c')"
val join_select_sql_1 = "select a.id, b.cnt from " +
"(select id from test_tbl_left_3134) as a " +
"left join (select id, 12 as cnt from test_tbl_right_3134 group by id)
as b on a.id = b.id"
@@ -2254,9 +2254,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
}
}
- // Please see the issue: https://github.com/oap-project/gluten/issues/3731
- ignore(
- "GLUTEN-3534: Fix incorrect logic of judging whether supports pre-project
for the shuffle") {
+ test("GLUTEN-3534: Fix incorrect logic of judging whether supports
pre-project for the shuffle") {
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
runQueryAndCompare(
s"""
@@ -2275,9 +2273,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
|order by t1.l_orderkey, t2.o_orderkey, t2.o_year, t1.l_cnt,
t2.o_cnt
|limit 100
|
- |""".stripMargin,
- true,
- true
+ |""".stripMargin
)(df => {})
runQueryAndCompare(
@@ -2296,9 +2292,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
|order by t1.l_orderkey, t2.o_orderkey, t2.o_year
|limit 100
|
- |""".stripMargin,
- true,
- true
+ |""".stripMargin
)(df => {})
}
}
@@ -2405,8 +2399,8 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
}
test("GLUTEN-3521: Bug fix substring index start from 1") {
- val tbl_create_sql = "create table test_tbl_3521(id bigint, name string)
using parquet";
- val data_insert_sql = "insert into test_tbl_3521 values(1, 'abcdefghijk'),
(2, '2023-10-32')";
+ val tbl_create_sql = "create table test_tbl_3521(id bigint, name string)
using parquet"
+ val data_insert_sql = "insert into test_tbl_3521 values(1, 'abcdefghijk'),
(2, '2023-10-32')"
val select_sql =
"select id, substring(name, 0), substring(name, 0, 3), substring(name
from 0), substring(name from 0 for 100) from test_tbl_3521"
spark.sql(tbl_create_sql)
@@ -2452,7 +2446,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
| ) t1
|) t2 where rank = 1 order by p_partkey limit 100
|""".stripMargin
- runQueryAndCompare(sql)({ _ => })
+ runQueryAndCompare(sql, noFallBack = isSparkVersionLE("3.3"))({ _ => })
}
test("GLUTEN-4190: crush on flattening a const null column") {
@@ -2485,9 +2479,9 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
}
test("GLUTEN-4085: Fix unix_timestamp") {
- val tbl_create_sql = "create table test_tbl_4085(id bigint, data string)
using parquet";
+ val tbl_create_sql = "create table test_tbl_4085(id bigint, data string)
using parquet"
val data_insert_sql =
- "insert into test_tbl_4085 values(1, '2023-12-18'),(2, '2023-12-19'),
(3, '2023-12-20')";
+ "insert into test_tbl_4085 values(1, '2023-12-18'),(2, '2023-12-19'),
(3, '2023-12-20')"
val select_sql =
"select id, unix_timestamp(to_date(data), 'yyyy-MM-dd') from
test_tbl_4085"
spark.sql(tbl_create_sql)
@@ -2497,8 +2491,8 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
}
test("GLUTEN-3951: Bug fix floor") {
- val tbl_create_sql = "create table test_tbl_3951(d double) using parquet";
- val data_insert_sql = "insert into test_tbl_3951 values(1.0), (2.0),
(2.5)";
+ val tbl_create_sql = "create table test_tbl_3951(d double) using parquet"
+ val data_insert_sql = "insert into test_tbl_3951 values(1.0), (2.0), (2.5)"
val select_sql =
"select floor(d), floor(log10(d-1)), floor(log10(d-2)) from
test_tbl_3951"
spark.sql(tbl_create_sql)
@@ -2559,7 +2553,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
}
test("GLUTEN-4279: Bug fix hour diff") {
- val tbl_create_sql = "create table test_tbl_4279(id bigint, data string)
using parquet";
+ val tbl_create_sql = "create table test_tbl_4279(id bigint, data string)
using parquet"
val tbl_insert_sql = "insert into test_tbl_4279 values(1, '2024-01-04
11:22:33'), " +
"(2, '2024-01-04 11:22:33.456+08'), (3, '2024'), (4, '2024-01'), (5,
'2024-01-04'), " +
"(6, '2024-01-04 12'), (7, '2024-01-04 12:12'), (8, '11:22:33'), (9,
'22:33')," +
@@ -2636,10 +2630,10 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
test("Inequal join support") {
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
- spark.sql("create table ineq_join_t1 (key bigint, value bigint) using
parquet");
- spark.sql("create table ineq_join_t2 (key bigint, value bigint) using
parquet");
- spark.sql("insert into ineq_join_t1 values(1, 1), (2, 2), (3, 3), (4,
4), (5, 5)");
- spark.sql("insert into ineq_join_t2 values(2, 2), (2, 1), (3, 3), (4,
6), (5, 3)");
+ spark.sql("create table ineq_join_t1 (key bigint, value bigint) using
parquet")
+ spark.sql("create table ineq_join_t2 (key bigint, value bigint) using
parquet")
+ spark.sql("insert into ineq_join_t1 values(1, 1), (2, 2), (3, 3), (4,
4), (5, 5)")
+ spark.sql("insert into ineq_join_t2 values(2, 2), (2, 1), (3, 3), (4,
6), (5, 3)")
val sql1 =
"""
| select t1.key, t1.value, t2.key, t2.value from ineq_join_t1 as t1
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
index 590d221f0..fc30d151b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
@@ -40,7 +40,7 @@ trait NativeWriteChecker
override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
if (!nativeUsed) {
val executedPlan = stripAQEPlan(qe.executedPlan)
- nativeUsed = if (isSparkVersionGE("3.4")) {
+ nativeUsed = if (isSparkVersionGE("3.5")) {
executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
} else {
executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]