This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8547046e634 [SPARK-41413][FOLLOWUP][SQL][TESTS] More test coverage in 
KeyGroupedPartitioningSuite
8547046e634 is described below

commit 8547046e63430c6bab747efd1c33888c18e97d86
Author: Chao Sun <[email protected]>
AuthorDate: Mon Jan 23 23:51:03 2023 -0800

    [SPARK-41413][FOLLOWUP][SQL][TESTS] More test coverage in 
KeyGroupedPartitioningSuite
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/38950, to make 
sure we run all tests in the suite with 
`spark.sql.sources.v2.bucketing.pushPartValues.enabled` on and off.
    
    ### Why are the changes needed?
    
    To increase test coverage. As the config 
`spark.sql.sources.v2.bucketing.pushPartValues.enabled` changes, the test 
results would change accordingly, so we should make sure the suite covers both 
cases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is just test related change.
    
    ### How was this patch tested?
    
    N/A
    
    Closes #39708 from sunchao/SPARK-41413-follow-up.
    
    Authored-by: Chao Sun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../connector/KeyGroupedPartitioningSuite.scala    | 183 +++++++++++++--------
 1 file changed, 117 insertions(+), 66 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index b75574a4e77..6cb2313f487 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -45,15 +45,12 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     UnboundTruncateFunction)
 
   private var originalV2BucketingEnabled: Boolean = false
-  private var originalV2BucketingPushPartKeysEnabled: Boolean = false
   private var originalAutoBroadcastJoinThreshold: Long = -1
 
   override def beforeAll(): Unit = {
     super.beforeAll()
     originalV2BucketingEnabled = conf.getConf(V2_BUCKETING_ENABLED)
-    originalV2BucketingPushPartKeysEnabled = 
conf.getConf(V2_BUCKETING_PUSH_PART_VALUES_ENABLED)
     conf.setConf(V2_BUCKETING_ENABLED, true)
-    conf.setConf(V2_BUCKETING_PUSH_PART_VALUES_ENABLED, true)
     originalAutoBroadcastJoinThreshold = 
conf.getConf(AUTO_BROADCASTJOIN_THRESHOLD)
     conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD, -1L)
   }
@@ -63,7 +60,6 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
       super.afterAll()
     } finally {
       conf.setConf(V2_BUCKETING_ENABLED, originalV2BucketingEnabled)
-      conf.setConf(V2_BUCKETING_PUSH_PART_VALUES_ENABLED, 
originalV2BucketingPushPartKeysEnabled)
       conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD, 
originalAutoBroadcastJoinThreshold)
     }
   }
@@ -346,16 +342,21 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
         s"(3, 19.5, cast('2020-02-01' as timestamp))")
 
-    val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-        s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-        "ON i.id = p.item_id AND i.arrive_time = p.time ORDER BY id, 
purchase_price, sale_price")
-
-    val shuffles = collectShuffles(df.queryExecution.executedPlan)
-    assert(shuffles.isEmpty, "should not add shuffle for both sides of the 
join")
-    checkAnswer(df,
-      Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa", 
41.0, 45.0),
-        Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc", 15.5, 
19.5))
-    )
+    Seq(true, false).foreach { pushDownValues =>
+      withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
+            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+            "ON i.id = p.item_id AND i.arrive_time = p.time " +
+            "ORDER BY id, purchase_price, sale_price")
+
+        val shuffles = collectShuffles(df.queryExecution.executedPlan)
+        assert(shuffles.isEmpty, "should not add shuffle for both sides of the 
join")
+        checkAnswer(df,
+          Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa", 
41.0, 45.0),
+            Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc", 
15.5, 19.5))
+        )
+      }
+    }
   }
 
   test("partitioned join: join with two partition keys and unsorted 
partitions") {
@@ -377,16 +378,21 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
         s"(3, 19.5, cast('2020-02-01' as timestamp))")
 
-    val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-        s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-        "ON i.id = p.item_id AND i.arrive_time = p.time ORDER BY id, 
purchase_price, sale_price")
-
-    val shuffles = collectShuffles(df.queryExecution.executedPlan)
-    assert(shuffles.isEmpty, "should not add shuffle for both sides of the 
join")
-    checkAnswer(df,
-      Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa", 
41.0, 45.0),
-        Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc", 15.5, 
19.5))
-    )
+    Seq(true, false).foreach { pushDownValues =>
+      withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
+            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+            "ON i.id = p.item_id AND i.arrive_time = p.time " +
+            "ORDER BY id, purchase_price, sale_price")
+
+        val shuffles = collectShuffles(df.queryExecution.executedPlan)
+        assert(shuffles.isEmpty, "should not add shuffle for both sides of the 
join")
+        checkAnswer(df,
+          Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa", 
41.0, 45.0),
+            Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc", 
15.5, 19.5))
+        )
+      }
+    }
   }
 
   test("partitioned join: join with two partition keys and different # of 
partition keys") {
@@ -404,15 +410,25 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
         s"(2, 11.0, cast('2020-01-01' as timestamp))")
 
-    val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-        s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-        "ON i.id = p.item_id AND i.arrive_time = p.time ORDER BY id, 
purchase_price, sale_price")
-
-    checkAnswer(df,
-      Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 11.0)))
-
-    val shuffles = collectShuffles(df.queryExecution.executedPlan)
-    assert(shuffles.isEmpty, "should not add shuffle when partition keys 
mismatch")
+    Seq(true, false).foreach { pushDownValues =>
+      withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
+            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+            "ON i.id = p.item_id AND i.arrive_time = p.time " +
+            "ORDER BY id, purchase_price, sale_price")
+
+        val shuffles = collectShuffles(df.queryExecution.executedPlan)
+        if (pushDownValues) {
+          assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
+        } else {
+          assert(shuffles.nonEmpty, "should add shuffle when partition values 
mismatch, and " +
+              "pushing down partition values is not enabled")
+        }
+
+        checkAnswer(df,
+          Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 11.0)))
+      }
+    }
   }
 
   test("SPARK-41413: partitioned join: partition values from one side are 
subset of those from " +
@@ -432,13 +448,23 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         "(1, 42.0, cast('2020-01-01' as timestamp)), " +
         "(3, 19.5, cast('2020-02-01' as timestamp))")
 
-    val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-        s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-        "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
-    val shuffles = collectShuffles(df.queryExecution.executedPlan)
-    assert(shuffles.isEmpty, "should not contain any shuffle")
-    checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5)))
+    Seq(true, false).foreach { pushDownValues =>
+      withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
+            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
+
+        val shuffles = collectShuffles(df.queryExecution.executedPlan)
+        if (pushDownValues) {
+          assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
+        } else {
+          assert(shuffles.nonEmpty, "should add shuffle when partition values 
mismatch, and " +
+              "pushing down partition values is not enabled")
+        }
+
+        checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 
19.5)))
+      }
+    }
   }
 
   test("SPARK-41413: partitioned join: partition values from both sides 
overlaps") {
@@ -457,13 +483,23 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         "(2, 19.5, cast('2020-02-01' as timestamp)), " +
         "(4, 30.0, cast('2020-02-01' as timestamp))")
 
-    val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-        s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-        "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
-    val shuffles = collectShuffles(df.queryExecution.executedPlan)
-    assert(shuffles.isEmpty, "should not contain any shuffle")
-    checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 19.5)))
+    Seq(true, false).foreach { pushDownValues =>
+      withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
+            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
+
+        val shuffles = collectShuffles(df.queryExecution.executedPlan)
+        if (pushDownValues) {
+          assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
+        } else {
+          assert(shuffles.nonEmpty, "should add shuffle when partition values 
mismatch, and " +
+              "pushing down partition values is not enabled")
+        }
+
+        checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 
19.5)))
+      }
+    }
   }
 
   test("SPARK-41413: partitioned join: non-overlapping partition values from 
both sides") {
@@ -481,13 +517,23 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         "(5, 19.5, cast('2020-02-01' as timestamp)), " +
         "(6, 30.0, cast('2020-02-01' as timestamp))")
 
-    val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-        s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-        "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
-    val shuffles = collectShuffles(df.queryExecution.executedPlan)
-    assert(shuffles.isEmpty, "should not contain any shuffle")
-    checkAnswer(df, Seq.empty)
+    Seq(true, false).foreach { pushDownValues =>
+      withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
+            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
+
+        val shuffles = collectShuffles(df.queryExecution.executedPlan)
+        if (pushDownValues) {
+          assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
+        } else {
+          assert(shuffles.nonEmpty, "should add shuffle when partition values 
mismatch, and " +
+              "pushing down partition values is not enabled")
+        }
+
+        checkAnswer(df, Seq.empty)
+      }
+    }
   }
 
   test("data source partitioning + dynamic partition filtering") {
@@ -515,18 +561,23 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
           s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
           s"(3, 19.5, cast('2020-02-01' as timestamp))")
 
-      // number of unique partitions changed after dynamic filtering - the gap 
should be filled
-      // with empty partitions and the job should still succeed
-      var df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, 
testcat.ns.$purchases p WHERE " +
-          s"i.id = p.item_id AND i.price > 40.0")
-      checkAnswer(df, Seq(Row(131)))
-
-      // dynamic filtering doesn't change partitioning so storage-partitioned 
join should kick in
-      df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, 
testcat.ns.$purchases p WHERE " +
-          s"i.id = p.item_id AND i.price >= 10.0")
-      val shuffles = collectShuffles(df.queryExecution.executedPlan)
-      assert(shuffles.isEmpty, "should not add shuffle for both sides of the 
join")
-      checkAnswer(df, Seq(Row(303.5)))
+      Seq(true, false).foreach { pushDownValues =>
+        withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+          // number of unique partitions changed after dynamic filtering - the 
gap should be filled
+          // with empty partitions and the job should still succeed
+          var df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, 
testcat.ns.$purchases p " +
+              "WHERE i.id = p.item_id AND i.price > 40.0")
+          checkAnswer(df, Seq(Row(131)))
+
+          // dynamic filtering doesn't change partitioning so 
storage-partitioned join should kick
+          // in
+          df = sql(s"SELECT sum(p.price) from testcat.ns.$items i, 
testcat.ns.$purchases p " +
+              "WHERE i.id = p.item_id AND i.price >= 10.0")
+          val shuffles = collectShuffles(df.queryExecution.executedPlan)
+          assert(shuffles.isEmpty, "should not add shuffle for both sides of 
the join")
+          checkAnswer(df, Seq(Row(303.5)))
+        }
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to