[ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-44641:
------------------------------
    Description: 
Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
    s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
    s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
    s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
    s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
    s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
    s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
    s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
    s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
    s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
    s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
    SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
    SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
    SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
      "true") {
    val df = sql("SELECT id, name, i.price as purchase_price, " +
      "p.item_id, p.price as sale_price " +
      s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
      "ON i.arrive_time = p.time " +
      "ORDER BY id, purchase_price, p.item_id, sale_price")

    val shuffles = collectShuffles(df.queryExecution.executedPlan)
    assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
    checkAnswer(df,
      Seq(
        Row(1, "aa", 40.0, 1, 42.0),
        Row(1, "aa", 40.0, 2, 11.0),
        Row(1, "aa", 41.0, 1, 44.0),
        Row(1, "aa", 41.0, 1, 45.0),
        Row(2, "bb", 10.0, 1, 42.0),
        Row(2, "bb", 10.0, 2, 11.0),
        Row(2, "bb", 10.5, 1, 42.0),
        Row(2, "bb", 10.5, 2, 11.0),
        Row(3, "cc", 15.5, 3, 19.5)
      )
    )
  }
}{code}
 

Note: this tests has setup the datasourceV2 to return multiple splits for same 
partition.

In this case, SPJ is not triggered (because join key does not match partition 
key), but the following code in DSV2Scan:

[https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]

intended to fill the empty partition for 'pushdown-vallue' will still iterate 
through non-grouped partition and lookup from grouped partition to fill the 
map, resulting in some duplicate input data fed into the join.

  was:
Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
problem.

 
{code:java}
test("test join key is the second partition key and a transform") {
  val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
  createTable(items, items_schema, items_partitions)
  sql(s"INSERT INTO testcat.ns.$items VALUES " +
    s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
    s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
    s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
    s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
    s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

  val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
  createTable(purchases, purchases_schema, purchases_partitions)
  sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
    s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
    s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
    s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
    s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
    s"(3, 19.5, cast('2020-02-01' as timestamp))")

  withSQLConf(
    SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
    SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
    SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
      "true") {
    val df = sql("SELECT id, name, i.price as purchase_price, " +
      "p.item_id, p.price as sale_price " +
      s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
      "ON i.arrive_time = p.time " +
      "ORDER BY id, purchase_price, p.item_id, sale_price")

    val shuffles = collectShuffles(df.queryExecution.executedPlan)
    assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are 
partition keys")
    checkAnswer(df,
      Seq(
        Row(1, "aa", 40.0, 1, 42.0),
        Row(1, "aa", 40.0, 2, 11.0),
        Row(1, "aa", 41.0, 1, 44.0),
        Row(1, "aa", 41.0, 1, 45.0),
        Row(2, "bb", 10.0, 1, 42.0),
        Row(2, "bb", 10.0, 2, 11.0),
        Row(2, "bb", 10.5, 1, 42.0),
        Row(2, "bb", 10.5, 2, 11.0),
        Row(3, "cc", 15.5, 3, 19.5)
      )
    )
  }
}{code}
 

This tests the case the datasourceV2 returns multiple splits for same partition.

In this case, SPJ is not triggered (because join key does not match partition 
key), but the following code in DSV2Scan:

[https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]

intended to fill the empty partition for 'pushdown-vallue' will still iterate 
through non-grouped partition and lookup from grouped partition to fill the 
map, resulting in some duplicate input data fed into the join.


> Results duplicated when SPJ partial-cluster and pushdown enabled but 
> conditions unmet
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-44641
>                 URL: https://issues.apache.org/jira/browse/SPARK-44641
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1
>            Reporter: Szehon Ho
>            Priority: Major
>
> Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
> problem.
>  
> {code:java}
> test("test join key is the second partition key and a transform") {
>   val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
>   createTable(items, items_schema, items_partitions)
>   sql(s"INSERT INTO testcat.ns.$items VALUES " +
>     s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
>     s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
>     s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
>     s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
>     s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
>   val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
>   createTable(purchases, purchases_schema, purchases_partitions)
>   sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
>     s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
>     s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
>     s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
>     s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
>     s"(3, 19.5, cast('2020-02-01' as timestamp))")
>   withSQLConf(
>     SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
>     SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
>     SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
>       "true") {
>     val df = sql("SELECT id, name, i.price as purchase_price, " +
>       "p.item_id, p.price as sale_price " +
>       s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
>       "ON i.arrive_time = p.time " +
>       "ORDER BY id, purchase_price, p.item_id, sale_price")
>     val shuffles = collectShuffles(df.queryExecution.executedPlan)
>     assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys 
> are partition keys")
>     checkAnswer(df,
>       Seq(
>         Row(1, "aa", 40.0, 1, 42.0),
>         Row(1, "aa", 40.0, 2, 11.0),
>         Row(1, "aa", 41.0, 1, 44.0),
>         Row(1, "aa", 41.0, 1, 45.0),
>         Row(2, "bb", 10.0, 1, 42.0),
>         Row(2, "bb", 10.0, 2, 11.0),
>         Row(2, "bb", 10.5, 1, 42.0),
>         Row(2, "bb", 10.5, 2, 11.0),
>         Row(3, "cc", 15.5, 3, 19.5)
>       )
>     )
>   }
> }{code}
>  
> Note: this tests has setup the datasourceV2 to return multiple splits for 
> same partition.
> In this case, SPJ is not triggered (because join key does not match partition 
> key), but the following code in DSV2Scan:
> [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]
> intended to fill the empty partition for 'pushdown-vallue' will still iterate 
> through non-grouped partition and lookup from grouped partition to fill the 
> map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to