This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new eadb591f37a [SPARK-45203][SQL][TEST] Join tests in KeyGroupedPartitioningSuite should use merge join hint eadb591f37a is described below commit eadb591f37a118096bab637e4b6ca913c2753a6b Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Mon Sep 18 11:50:16 2023 -0700 [SPARK-45203][SQL][TEST] Join tests in KeyGroupedPartitioningSuite should use merge join hint ### What changes were proposed in this pull request? It's more robust to use join hints to enforce sort-merge join in the tests, instead of setting configs which may be ineffective after more advanced optimizations in the future. ### Why are the changes needed? make tests more future-proof ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #42983 from cloud-fan/minor. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../connector/KeyGroupedPartitioningSuite.scala | 246 ++++++++------------- 1 file changed, 93 insertions(+), 153 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index ffd1c8e31e9..4cb5457b66b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector import java.util.Collections +import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} @@ -44,25 +45,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { UnboundBucketFunction, UnboundTruncateFunction) - private var originalV2BucketingEnabled: Boolean = false - private var originalAutoBroadcastJoinThreshold: Long = -1 - - override def beforeAll(): Unit = { - super.beforeAll() - originalV2BucketingEnabled = conf.getConf(V2_BUCKETING_ENABLED) - conf.setConf(V2_BUCKETING_ENABLED, true) - originalAutoBroadcastJoinThreshold = conf.getConf(AUTO_BROADCASTJOIN_THRESHOLD) - conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD, -1L) - } - - override def afterAll(): Unit = { - try { - super.afterAll() - } finally { - conf.setConf(V2_BUCKETING_ENABLED, originalV2BucketingEnabled) - conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD, originalAutoBroadcastJoinThreshold) - } - } + override def sparkConf: SparkConf = super.sparkConf + .set(V2_BUCKETING_ENABLED, true) + .set(AUTO_BROADCASTJOIN_THRESHOLD, -1L) before { functions.foreach { f => @@ -261,6 +246,25 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { .add("order_amount", DoubleType) .add("customer_id", LongType) + private def selectWithMergeJoinHint(t1: String, t2: String): String = { + s"SELECT /*+ MERGE($t1, $t2) */ " + } + + private def createJoinTestDF( + keys: Seq[(String, String)], + extraColumns: Seq[String] = Nil, + joinType: String = ""): DataFrame = { + val extraColList = if (extraColumns.isEmpty) "" else extraColumns.mkString(", ", ", ", "") + sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |id, name, i.price as purchase_price, p.price as sale_price $extraColList + |FROM testcat.ns.$items i $joinType JOIN testcat.ns.$purchases p + |ON ${keys.map(k => s"i.${k._1} = p.${k._2}").mkString(" AND ")} + |ORDER BY id, purchase_price, sale_price $extraColList + |""".stripMargin) + } + private def testWithCustomersAndOrders( customers_partitions: Array[Transform], orders_partitions: Array[Transform], @@ -273,9 +277,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { sql(s"INSERT INTO testcat.ns.$orders VALUES " + s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 3)") - val df = sql("SELECT customer_name, customer_age, order_amount " + - s"FROM testcat.ns.$customers c JOIN testcat.ns.$orders o " + - "ON c.customer_id = o.customer_id ORDER BY c.customer_id, order_amount") + val df = sql( + s""" + |${selectWithMergeJoinHint("c", "o")} + |customer_name, customer_age, order_amount + |FROM testcat.ns.$customers c JOIN testcat.ns.$orders o + |ON c.customer_id = o.customer_id ORDER BY c.customer_id, order_amount + |""".stripMargin) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.length == expectedNumOfShuffleExecs) @@ -354,11 +362,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id AND i.arrive_time = p.time " + - "ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> "time")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") checkAnswer(df, @@ -390,11 +394,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id AND i.arrive_time = p.time " + - "ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> "time")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") checkAnswer(df, @@ -422,11 +422,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id AND i.arrive_time = p.time " + - "ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> "time")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") @@ -460,10 +456,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") @@ -495,10 +488,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") @@ -529,10 +519,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { pushDownValues => withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not add shuffle when partition values mismatch") @@ -569,10 +556,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { withSQLConf( SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "should not contain any shuffle") if (pushDownValues) { @@ -613,10 +597,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { withSQLConf( SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.isEmpty, "should not contain any shuffle") if (pushDownValues) { @@ -663,10 +644,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { withSQLConf( SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not contain any shuffle") @@ -714,10 +692,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { withSQLConf( SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not contain any shuffle") @@ -761,10 +736,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { withSQLConf( SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not contain any shuffle") @@ -808,11 +780,8 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> false.toString, SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i LEFT JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id AND i.arrive_time = p.time " + - "ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF( + Seq("id" -> "item_id", "arrive_time" -> "time"), joinType = "LEFT") val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not contain any shuffle") @@ -860,11 +829,8 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> false.toString, SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i RIGHT JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id AND i.arrive_time = p.time " + - "ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF( + Seq("id" -> "item_id", "arrive_time" -> "time"), joinType = "RIGHT") val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not contain any shuffle") @@ -911,11 +877,8 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> false.toString, SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i FULL OUTER JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id AND i.arrive_time = p.time " + - "ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF( + Seq("id" -> "item_id", "arrive_time" -> "time"), joinType = "FULL OUTER") val shuffles = collectShuffles(df.queryExecution.executedPlan) if (pushDownValues) { assert(shuffles.isEmpty, "should not contain any shuffle") @@ -1059,10 +1022,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { shuffle => withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (shuffle) { assert(shuffles.size == 1, "only shuffle one side not report partitioning") @@ -1094,11 +1054,8 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { shuffle => withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { - Seq("JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN", "FULL OUTER JOIN").foreach { joinType => - val df = sql(s"SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i $joinType testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + Seq("", "LEFT OUTER", "RIGHT OUTER", "FULL OUTER").foreach { joinType => + val df = createJoinTestDF(Seq("id" -> "item_id"), joinType = joinType) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (shuffle) { assert(shuffles.size == 1, "only shuffle one side not report partitioning") @@ -1107,15 +1064,15 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "side is not enabled") } joinType match { - case "JOIN" => + case "" => checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) - case "LEFT OUTER JOIN" => + case "LEFT OUTER" => checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5), Row(4, "cc", 15.5, null))) - case "RIGHT OUTER JOIN" => + case "RIGHT OUTER" => checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, null, 50.0), Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) - case "FULL OUTER JOIN" => + case "FULL OUTER" => checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, null, 50.0), Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5), Row(4, "cc", 15.5, null))) @@ -1141,10 +1098,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { shuffle => withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id and i.arrive_time = p.time ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> "time")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (shuffle) { assert(shuffles.size == 1, "only shuffle one side not report partitioning") @@ -1174,10 +1128,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(true, false).foreach { shuffle => withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.arrive_time = p.time ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("arrive_time" -> "time")) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (shuffle) { assert(shuffles.size == 2, "partitioning with transform not work now") @@ -1215,10 +1166,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString, SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "true") { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5))) } } @@ -1252,27 +1200,21 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { partiallyClusteredEnabled.toString) { // join keys are not the same as the partition keys, therefore SPJ is not triggered. - val df = sql( - s""" - SELECT id, name, i.price as purchase_price, p.item_id, p.price as sale_price - FROM testcat.ns.$items i JOIN testcat.ns.$purchases p - ON i.arrive_time = p.time ORDER BY id, purchase_price, p.item_id, sale_price - """) - + val df = createJoinTestDF(Seq("arrive_time" -> "time"), extraColumns = Seq("p.item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.nonEmpty, "shuffle should exist when SPJ is not used") checkAnswer(df, Seq( - Row(1, "aa", 40.0, 1, 42.0), - Row(1, "aa", 40.0, 2, 11.0), - Row(1, "aa", 41.0, 1, 44.0), - Row(1, "aa", 41.0, 1, 45.0), - Row(2, "bb", 10.0, 1, 42.0), - Row(2, "bb", 10.0, 2, 11.0), - Row(2, "bb", 10.5, 1, 42.0), - Row(2, "bb", 10.5, 2, 11.0), - Row(3, "cc", 15.5, 3, 19.5) + Row(1, "aa", 40.0, 11.0, 2), + Row(1, "aa", 40.0, 42.0, 1), + Row(1, "aa", 41.0, 44.0, 1), + Row(1, "aa", 41.0, 45.0, 1), + Row(2, "bb", 10.0, 11.0, 2), + Row(2, "bb", 10.0, 42.0, 1), + Row(2, "bb", 10.5, 11.0, 2), + Row(2, "bb", 10.5, 42.0, 1), + Row(3, "cc", 15.5, 19.5, 3) ) ) } @@ -1316,11 +1258,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { partiallyClustered.toString, SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> allowJoinKeysSubsetOfPartitionKeys.toString) { - - val df = sql("SELECT t1.id AS id, t1.data AS t1data, t2.data AS t2data " + - s"FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 " + - "ON t1.id = t2.id ORDER BY t1.id, t1data, t2data") - + val df = sql( + s""" + |${selectWithMergeJoinHint("t1", "t2")} + |t1.id AS id, t1.data AS t1data, t2.data AS t2data + |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 + |ON t1.id = t2.id ORDER BY t1.id, t1data, t2data + |""".stripMargin) val shuffles = collectShuffles(df.queryExecution.executedPlan) if (allowJoinKeysSubsetOfPartitionKeys) { assert(shuffles.isEmpty, "SPJ should be triggered") @@ -1394,10 +1338,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> allowJoinKeysSubsetOfPartitionKeys.toString) { - val df = sql("SELECT t1.id AS t1id, t2.id as t2id, t1.data AS data " + - s"FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 " + - "ON t1.data = t2.data ORDER BY t1id, t1id, data") - + val df = sql( + s""" + |${selectWithMergeJoinHint("t1", "t2")} + |t1.id AS t1id, t2.id as t2id, t1.data AS data + |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 + |ON t1.data = t2.data + |ORDER BY t1id, t1id, data + |""".stripMargin) checkAnswer(df, Seq(Row(1, 4, "aa"), Row(2, 5, "bb"), Row(3, 6, "cc"))) val shuffles = collectShuffles(df.queryExecution.executedPlan) @@ -1451,12 +1399,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { partiallyClustered.toString, SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> allowJoinKeysSubsetOfPartitionKeys.toString) { - val df = sql("SELECT id, name, i.price as purchase_price, " + - "p.item_id, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.arrive_time = p.time " + - "ORDER BY id, purchase_price, p.item_id, sale_price") - + val df = createJoinTestDF(Seq("arrive_time" -> "time"), extraColumns = Seq("p.item_id")) // Currently SPJ for case where join key not same as partition key // only supported when push-part-values enabled val shuffles = collectShuffles(df.queryExecution.executedPlan) @@ -1479,15 +1422,15 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { checkAnswer(df, Seq( - Row(1, "aa", 40.0, 1, 42.0), - Row(1, "aa", 40.0, 2, 11.0), - Row(1, "aa", 41.0, 1, 44.0), - Row(1, "aa", 41.0, 1, 45.0), - Row(2, "bb", 10.0, 1, 42.0), - Row(2, "bb", 10.0, 2, 11.0), - Row(2, "bb", 10.5, 1, 42.0), - Row(2, "bb", 10.5, 2, 11.0), - Row(3, "cc", 15.5, 3, 19.5) + Row(1, "aa", 40.0, 11.0, 2), + Row(1, "aa", 40.0, 42.0, 1), + Row(1, "aa", 41.0, 44.0, 1), + Row(1, "aa", 41.0, 45.0, 1), + Row(2, "bb", 10.0, 11.0, 2), + Row(2, "bb", 10.0, 42.0, 1), + Row(2, "bb", 10.5, 11.0, 2), + Row(2, "bb", 10.5, 42.0, 1), + Row(3, "cc", 15.5, 19.5, 3) ) ) } @@ -1522,10 +1465,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> partiallyClustered.toString, SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true") { - val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " + - s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + - "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price") - + val df = createJoinTestDF(Seq("id" -> "item_id")) val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(shuffles.size == 1, "SPJ should be triggered") checkAnswer(df, Seq(Row(1, "aa", 30.0, 42.0), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org