This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new fb88650 test: Reduce test time spent in `CometShuffleSuite` (#40)
fb88650 is described below
commit fb88650aecc0c8593c0d640c3c69234444f00861
Author: Chao Sun <[email protected]>
AuthorDate: Mon Feb 19 21:32:38 2024 -0800
test: Reduce test time spent in `CometShuffleSuite` (#40)
---
.../org/apache/comet/exec/CometShuffleSuite.scala | 68 +++++++++++-----------
1 file changed, 33 insertions(+), 35 deletions(-)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
index db78bc2..dc47482 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
@@ -41,14 +41,11 @@ abstract class CometShuffleSuiteBase extends CometTestBase
with AdaptiveSparkPla
protected val numElementsForceSpillThreshold: Int = 10
- protected val encryptionEnabled: Boolean = false
-
override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key,
adaptiveExecutionEnabled.toString)
.set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString)
- .set("spark.io.encryption.enabled", encryptionEnabled.toString)
}
protected val asyncShuffleEnable: Boolean
@@ -780,40 +777,41 @@ class DisableAQECometAsyncShuffleSuite extends
CometShuffleSuiteBase {
protected val adaptiveExecutionEnabled: Boolean = false
}
-/**
- * This suite tests the Comet shuffle encryption. Because the encryption
configuration can only be
- * set in SparkConf at the beginning, we need to create a separate suite for
encryption.
- */
-class CometShuffleEncryptionSuite extends CometShuffleSuiteBase {
- override protected val adaptiveExecutionEnabled: Boolean = true
-
- override protected val asyncShuffleEnable: Boolean = false
-
- override protected val encryptionEnabled: Boolean = true
-}
-
-class CometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase {
- override protected val adaptiveExecutionEnabled: Boolean = true
-
- override protected val asyncShuffleEnable: Boolean = true
-
- override protected val encryptionEnabled: Boolean = true
-}
-
-class DisableAQECometShuffleEncryptionSuite extends CometShuffleSuiteBase {
- override protected val adaptiveExecutionEnabled: Boolean = false
-
- override protected val asyncShuffleEnable: Boolean = false
-
- override protected val encryptionEnabled: Boolean = true
-}
-
-class DisableAQECometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase
{
- override protected val adaptiveExecutionEnabled: Boolean = false
+class CometShuffleEncryptionSuite extends CometTestBase {
+ import testImplicits._
- override protected val asyncShuffleEnable: Boolean = true
+ override protected def sparkConf: SparkConf = {
+ val conf = super.sparkConf
+ conf.set("spark.io.encryption.enabled", "true")
+ }
- override protected val encryptionEnabled: Boolean = true
+ test("comet columnar shuffle with encryption") {
+ Seq(10, 201).foreach { numPartitions =>
+ Seq(true, false).foreach { dictionaryEnabled =>
+ Seq(true, false).foreach { asyncEnabled =>
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllTypes(path, dictionaryEnabled =
dictionaryEnabled, 1000)
+
+ (1 until 10).map(i => $"_$i").foreach { col =>
+ withSQLConf(
+ CometConf.COMET_EXEC_ENABLED.key -> "false",
+ CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+ CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key ->
asyncEnabled.toString) {
+ readParquetFile(path.toString) { df =>
+ val shuffled = df
+ .select($"_1")
+ .repartition(numPartitions, col)
+ checkSparkAnswer(shuffled)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
class CometShuffleManagerSuite extends CometTestBase {