[
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chao Sun updated SPARK-44641:
-----------------------------
Summary: SPJ: Results duplicated when SPJ partial-cluster and pushdown
enabled but conditions unmet (was: Results duplicated when SPJ partial-cluster
and pushdown enabled but conditions unmet)
> SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but
> conditions unmet
> ------------------------------------------------------------------------------------------
>
> Key: SPARK-44641
> URL: https://issues.apache.org/jira/browse/SPARK-44641
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 3.4.1
> Reporter: Szehon Ho
> Priority: Major
>
> Adding the following test case in KeyGroupedPartitionSuite demonstrates the
> problem.
>
> {code:java}
> test("test join key is the second partition key and a transform") {
> val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
> createTable(items, items_schema, items_partitions)
> sql(s"INSERT INTO testcat.ns.$items VALUES " +
> s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
> s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
> s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
> val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
> createTable(purchases, purchases_schema, purchases_partitions)
> sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
> s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
> s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
> s"(3, 19.5, cast('2020-02-01' as timestamp))")
> withSQLConf(
> SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
> SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
> SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
> "true") {
> val df = sql("SELECT id, name, i.price as purchase_price, " +
> "p.item_id, p.price as sale_price " +
> s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
> "ON i.arrive_time = p.time " +
> "ORDER BY id, purchase_price, p.item_id, sale_price")
> val shuffles = collectShuffles(df.queryExecution.executedPlan)
> assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys
> are partition keys")
> checkAnswer(df,
> Seq(
> Row(1, "aa", 40.0, 1, 42.0),
> Row(1, "aa", 40.0, 2, 11.0),
> Row(1, "aa", 41.0, 1, 44.0),
> Row(1, "aa", 41.0, 1, 45.0),
> Row(2, "bb", 10.0, 1, 42.0),
> Row(2, "bb", 10.0, 2, 11.0),
> Row(2, "bb", 10.5, 1, 42.0),
> Row(2, "bb", 10.5, 2, 11.0),
> Row(3, "cc", 15.5, 3, 19.5)
> )
> )
> }
> }{code}
>
> Note: this tests has setup the datasourceV2 to return multiple splits for
> same partition.
> In this case, SPJ is not triggered (because join key does not match partition
> key), but the following code in DSV2Scan:
> [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]
> intended to fill the empty partition for 'pushdown-vallue' will still iterate
> through non-grouped partition and lookup from grouped partition to fill the
> map, resulting in some duplicate input data fed into the join.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]