This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8547046e634 [SPARK-41413][FOLLOWUP][SQL][TESTS] More test coverage in
KeyGroupedPartitioningSuite
8547046e634 is described below
commit 8547046e63430c6bab747efd1c33888c18e97d86
Author: Chao Sun <[email protected]>
AuthorDate: Mon Jan 23 23:51:03 2023 -0800
[SPARK-41413][FOLLOWUP][SQL][TESTS] More test coverage in
KeyGroupedPartitioningSuite
### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/38950, to make
sure we run all tests in the suite with
`spark.sql.sources.v2.bucketing.pushPartValues.enabled` on and off.
### Why are the changes needed?
To increase test coverage. As the config
`spark.sql.sources.v2.bucketing.pushPartValues.enabled` changes, the test
results would change accordingly, so we should make sure the suite covers both
cases.
### Does this PR introduce _any_ user-facing change?
No, this is just test related change.
### How was this patch tested?
N/A
Closes #39708 from sunchao/SPARK-41413-follow-up.
Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../connector/KeyGroupedPartitioningSuite.scala | 183 +++++++++++++--------
1 file changed, 117 insertions(+), 66 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index b75574a4e77..6cb2313f487 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -45,15 +45,12 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
UnboundTruncateFunction)
private var originalV2BucketingEnabled: Boolean = false
- private var originalV2BucketingPushPartKeysEnabled: Boolean = false
private var originalAutoBroadcastJoinThreshold: Long = -1
override def beforeAll(): Unit = {
super.beforeAll()
originalV2BucketingEnabled = conf.getConf(V2_BUCKETING_ENABLED)
- originalV2BucketingPushPartKeysEnabled =
conf.getConf(V2_BUCKETING_PUSH_PART_VALUES_ENABLED)
conf.setConf(V2_BUCKETING_ENABLED, true)
- conf.setConf(V2_BUCKETING_PUSH_PART_VALUES_ENABLED, true)
originalAutoBroadcastJoinThreshold =
conf.getConf(AUTO_BROADCASTJOIN_THRESHOLD)
conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD, -1L)
}
@@ -63,7 +60,6 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
super.afterAll()
} finally {
conf.setConf(V2_BUCKETING_ENABLED, originalV2BucketingEnabled)
- conf.setConf(V2_BUCKETING_PUSH_PART_VALUES_ENABLED,
originalV2BucketingPushPartKeysEnabled)
conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD,
originalAutoBroadcastJoinThreshold)
}
}
@@ -346,16 +342,21 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")
- val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
- s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
- "ON i.id = p.item_id AND i.arrive_time = p.time ORDER BY id,
purchase_price, sale_price")
-
- val shuffles = collectShuffles(df.queryExecution.executedPlan)
- assert(shuffles.isEmpty, "should not add shuffle for both sides of the
join")
- checkAnswer(df,
- Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa",
41.0, 45.0),
- Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc", 15.5,
19.5))
- )
+ Seq(true, false).foreach { pushDownValues =>
+ withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
+ s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+ "ON i.id = p.item_id AND i.arrive_time = p.time " +
+ "ORDER BY id, purchase_price, sale_price")
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "should not add shuffle for both sides of the
join")
+ checkAnswer(df,
+ Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa",
41.0, 45.0),
+ Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc",
15.5, 19.5))
+ )
+ }
+ }
}
test("partitioned join: join with two partition keys and unsorted
partitions") {
@@ -377,16 +378,21 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")
- val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
- s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
- "ON i.id = p.item_id AND i.arrive_time = p.time ORDER BY id,
purchase_price, sale_price")
-
- val shuffles = collectShuffles(df.queryExecution.executedPlan)
- assert(shuffles.isEmpty, "should not add shuffle for both sides of the
join")
- checkAnswer(df,
- Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa",
41.0, 45.0),
- Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc", 15.5,
19.5))
- )
+ Seq(true, false).foreach { pushDownValues =>
+ withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
+ s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+ "ON i.id = p.item_id AND i.arrive_time = p.time " +
+ "ORDER BY id, purchase_price, sale_price")
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "should not add shuffle for both sides of the
join")
+ checkAnswer(df,
+ Seq(Row(1, "aa", 40.0, 42.0), Row(1, "aa", 41.0, 44.0), Row(1, "aa",
41.0, 45.0),
+ Row(2, "bb", 10.0, 11.0), Row(2, "bb", 10.5, 11.0), Row(3, "cc",
15.5, 19.5))
+ )
+ }
+ }
}
test("partitioned join: join with two partition keys and different # of
partition keys") {
@@ -404,15 +410,25 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp))")
- val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
- s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
- "ON i.id = p.item_id AND i.arrive_time = p.time ORDER BY id,
purchase_price, sale_price")
-
- checkAnswer(df,
- Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 11.0)))
-
- val shuffles = collectShuffles(df.queryExecution.executedPlan)
- assert(shuffles.isEmpty, "should not add shuffle when partition keys
mismatch")
+ Seq(true, false).foreach { pushDownValues =>
+ withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
+ s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+ "ON i.id = p.item_id AND i.arrive_time = p.time " +
+ "ORDER BY id, purchase_price, sale_price")
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ if (pushDownValues) {
+ assert(shuffles.isEmpty, "should not add shuffle when partition
values mismatch")
+ } else {
+ assert(shuffles.nonEmpty, "should add shuffle when partition values
mismatch, and " +
+ "pushing down partition values is not enabled")
+ }
+
+ checkAnswer(df,
+ Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 11.0)))
+ }
+ }
}
test("SPARK-41413: partitioned join: partition values from one side are
subset of those from " +
@@ -432,13 +448,23 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
"(1, 42.0, cast('2020-01-01' as timestamp)), " +
"(3, 19.5, cast('2020-02-01' as timestamp))")
- val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
- s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
- "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
- val shuffles = collectShuffles(df.queryExecution.executedPlan)
- assert(shuffles.isEmpty, "should not contain any shuffle")
- checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5)))
+ Seq(true, false).foreach { pushDownValues =>
+ withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
+ s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+ "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ if (pushDownValues) {
+ assert(shuffles.isEmpty, "should not add shuffle when partition
values mismatch")
+ } else {
+ assert(shuffles.nonEmpty, "should add shuffle when partition values
mismatch, and " +
+ "pushing down partition values is not enabled")
+ }
+
+ checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0,
19.5)))
+ }
+ }
}
test("SPARK-41413: partitioned join: partition values from both sides
overlaps") {
@@ -457,13 +483,23 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
"(2, 19.5, cast('2020-02-01' as timestamp)), " +
"(4, 30.0, cast('2020-02-01' as timestamp))")
- val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
- s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
- "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
- val shuffles = collectShuffles(df.queryExecution.executedPlan)
- assert(shuffles.isEmpty, "should not contain any shuffle")
- checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0, 19.5)))
+ Seq(true, false).foreach { pushDownValues =>
+ withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
+ s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+ "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ if (pushDownValues) {
+ assert(shuffles.isEmpty, "should not add shuffle when partition
values mismatch")
+ } else {
+ assert(shuffles.nonEmpty, "should add shuffle when partition values
mismatch, and " +
+ "pushing down partition values is not enabled")
+ }
+
+ checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(2, "bb", 10.0,
19.5)))
+ }
+ }
}
test("SPARK-41413: partitioned join: non-overlapping partition values from
both sides") {
@@ -481,13 +517,23 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
"(5, 19.5, cast('2020-02-01' as timestamp)), " +
"(6, 30.0, cast('2020-02-01' as timestamp))")
- val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
- s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
- "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
- val shuffles = collectShuffles(df.queryExecution.executedPlan)
- assert(shuffles.isEmpty, "should not contain any shuffle")
- checkAnswer(df, Seq.empty)
+ Seq(true, false).foreach { pushDownValues =>
+ withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ val df = sql("SELECT id, name, i.price as purchase_price, p.price as
sale_price " +
+ s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
+ "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ if (pushDownValues) {
+ assert(shuffles.isEmpty, "should not add shuffle when partition
values mismatch")
+ } else {
+ assert(shuffles.nonEmpty, "should add shuffle when partition values
mismatch, and " +
+ "pushing down partition values is not enabled")
+ }
+
+ checkAnswer(df, Seq.empty)
+ }
+ }
}
test("data source partitioning + dynamic partition filtering") {
@@ -515,18 +561,23 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")
- // number of unique partitions changed after dynamic filtering - the gap
should be filled
- // with empty partitions and the job should still succeed
- var df = sql(s"SELECT sum(p.price) from testcat.ns.$items i,
testcat.ns.$purchases p WHERE " +
- s"i.id = p.item_id AND i.price > 40.0")
- checkAnswer(df, Seq(Row(131)))
-
- // dynamic filtering doesn't change partitioning so storage-partitioned
join should kick in
- df = sql(s"SELECT sum(p.price) from testcat.ns.$items i,
testcat.ns.$purchases p WHERE " +
- s"i.id = p.item_id AND i.price >= 10.0")
- val shuffles = collectShuffles(df.queryExecution.executedPlan)
- assert(shuffles.isEmpty, "should not add shuffle for both sides of the
join")
- checkAnswer(df, Seq(Row(303.5)))
+ Seq(true, false).foreach { pushDownValues =>
+ withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key ->
pushDownValues.toString) {
+ // number of unique partitions changed after dynamic filtering - the
gap should be filled
+ // with empty partitions and the job should still succeed
+ var df = sql(s"SELECT sum(p.price) from testcat.ns.$items i,
testcat.ns.$purchases p " +
+ "WHERE i.id = p.item_id AND i.price > 40.0")
+ checkAnswer(df, Seq(Row(131)))
+
+ // dynamic filtering doesn't change partitioning so
storage-partitioned join should kick
+ // in
+ df = sql(s"SELECT sum(p.price) from testcat.ns.$items i,
testcat.ns.$purchases p " +
+ "WHERE i.id = p.item_id AND i.price >= 10.0")
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "should not add shuffle for both sides of
the join")
+ checkAnswer(df, Seq(Row(303.5)))
+ }
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]