This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new fb88650  test: Reduce test time spent in `CometShuffleSuite` (#40)
fb88650 is described below

commit fb88650aecc0c8593c0d640c3c69234444f00861
Author: Chao Sun <[email protected]>
AuthorDate: Mon Feb 19 21:32:38 2024 -0800

    test: Reduce test time spent in `CometShuffleSuite` (#40)
---
 .../org/apache/comet/exec/CometShuffleSuite.scala  | 68 +++++++++++-----------
 1 file changed, 33 insertions(+), 35 deletions(-)

diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
index db78bc2..dc47482 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
@@ -41,14 +41,11 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
 
   protected val numElementsForceSpillThreshold: Int = 10
 
-  protected val encryptionEnabled: Boolean = false
-
   override protected def sparkConf: SparkConf = {
     val conf = super.sparkConf
     conf
       .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
adaptiveExecutionEnabled.toString)
       .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString)
-      .set("spark.io.encryption.enabled", encryptionEnabled.toString)
   }
 
   protected val asyncShuffleEnable: Boolean
@@ -780,40 +777,41 @@ class DisableAQECometAsyncShuffleSuite extends 
CometShuffleSuiteBase {
   protected val adaptiveExecutionEnabled: Boolean = false
 }
 
-/**
- * This suite tests the Comet shuffle encryption. Because the encryption 
configuration can only be
- * set in SparkConf at the beginning, we need to create a separate suite for 
encryption.
- */
-class CometShuffleEncryptionSuite extends CometShuffleSuiteBase {
-  override protected val adaptiveExecutionEnabled: Boolean = true
-
-  override protected val asyncShuffleEnable: Boolean = false
-
-  override protected val encryptionEnabled: Boolean = true
-}
-
-class CometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase {
-  override protected val adaptiveExecutionEnabled: Boolean = true
-
-  override protected val asyncShuffleEnable: Boolean = true
-
-  override protected val encryptionEnabled: Boolean = true
-}
-
-class DisableAQECometShuffleEncryptionSuite extends CometShuffleSuiteBase {
-  override protected val adaptiveExecutionEnabled: Boolean = false
-
-  override protected val asyncShuffleEnable: Boolean = false
-
-  override protected val encryptionEnabled: Boolean = true
-}
-
-class DisableAQECometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase 
{
-  override protected val adaptiveExecutionEnabled: Boolean = false
+class CometShuffleEncryptionSuite extends CometTestBase {
+  import testImplicits._
 
-  override protected val asyncShuffleEnable: Boolean = true
+  override protected def sparkConf: SparkConf = {
+    val conf = super.sparkConf
+    conf.set("spark.io.encryption.enabled", "true")
+  }
 
-  override protected val encryptionEnabled: Boolean = true
+  test("comet columnar shuffle with encryption") {
+    Seq(10, 201).foreach { numPartitions =>
+      Seq(true, false).foreach { dictionaryEnabled =>
+        Seq(true, false).foreach { asyncEnabled =>
+          withTempDir { dir =>
+            val path = new Path(dir.toURI.toString, "test.parquet")
+            makeParquetFileAllTypes(path, dictionaryEnabled = 
dictionaryEnabled, 1000)
+
+            (1 until 10).map(i => $"_$i").foreach { col =>
+              withSQLConf(
+                CometConf.COMET_EXEC_ENABLED.key -> "false",
+                CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+                CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+                CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> 
asyncEnabled.toString) {
+                readParquetFile(path.toString) { df =>
+                  val shuffled = df
+                    .select($"_1")
+                    .repartition(numPartitions, col)
+                  checkSparkAnswer(shuffled)
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
 
 class CometShuffleManagerSuite extends CometTestBase {

Reply via email to