[ 
https://issues.apache.org/jira/browse/SPARK-55411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Toth reassigned SPARK-55411:
----------------------------------

    Assignee: Cheng Pan

> SPJ may throw ArrayIndexOutOfBoundsException when join keys are less than 
> cluster keys
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-55411
>                 URL: https://issues.apache.org/jira/browse/SPARK-55411
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.2, 4.1.1
>            Reporter: Cheng Pan
>            Assignee: Cheng Pan
>            Priority: Major
>              Labels: pull-request-available
>
> less join keys than partition keys, plus one side shuffle
>  
> {code:java}
> test("bug") {
>   withSQLConf(
>     SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
>     SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true",
>     SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
>     SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> 
> "false",
>     SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> 
> "true") {
>     val customers_partitions = Array(identity("customer_name"), bucket(4, 
> "customer_id"))
>     createTable(customers, customersColumns, customers_partitions)
>     sql(s"INSERT INTO testcat.ns.$customers VALUES " +
>       s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)")
>     createTable(orders, ordersColumns, Array.empty)
>     sql(s"INSERT INTO testcat.ns.$orders VALUES " +
>       s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 
> 3)")
>     val df = sql(
>       s"""
>          |${selectWithMergeJoinHint("c", "o")}
>          |customer_name, customer_age, order_amount
>          |FROM testcat.ns.$customers c JOIN testcat.ns.$orders o
>          |ON c.customer_id = o.customer_id ORDER BY c.customer_id, 
> order_amount
>          |""".stripMargin)
>     val shuffles = collectShuffles(df.queryExecution.executedPlan)
>     assert(shuffles.length == 1)
>     checkAnswer(df,
>       Seq(Row("aaa", 10, 100.0), Row("aaa", 10, 200.0), Row("bbb", 20, 150.0),
>         Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
>   }
> } {code}
>  
> {code:java}
> $ build/sbt "sql/testOnly *KeyGroupedPartitioningSuite -- -z bug"
> ...
> [info] - bug *** FAILED *** (1 second, 884 milliseconds)
> [info]   java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for 
> length 1
> [info]   at 
> scala.collection.immutable.ArraySeq$ofRef.apply(ArraySeq.scala:331)
> [info]   at 
> org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.$anonfun$project$1(partitioning.scala:471)
> [info]   at 
> org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.$anonfun$project$1$adapted(partitioning.scala:471)
> [info]   at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
> [info]   at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
> [info]   at 
> org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.project(partitioning.scala:471)
> [info]   at 
> org.apache.spark.sql.execution.KeyGroupedPartitionedScan.$anonfun$getOutputKeyGroupedPartitioning$5(KeyGroupedPartitionedScan.scala:58)
> ... {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to