[
https://issues.apache.org/jira/browse/SPARK-55411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Peter Toth resolved SPARK-55411.
--------------------------------
Fix Version/s: 4.2.0
Resolution: Fixed
> SPJ may throw ArrayIndexOutOfBoundsException when join keys are less than
> cluster keys
> --------------------------------------------------------------------------------------
>
> Key: SPARK-55411
> URL: https://issues.apache.org/jira/browse/SPARK-55411
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.0.2, 4.1.1
> Reporter: Cheng Pan
> Assignee: Cheng Pan
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.2.0
>
>
> less join keys than partition keys, plus one side shuffle
>
> {code:java}
> test("bug") {
> withSQLConf(
> SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
> SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true",
> SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
> SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
> "false",
> SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
> "true") {
> val customers_partitions = Array(identity("customer_name"), bucket(4,
> "customer_id"))
> createTable(customers, customersColumns, customers_partitions)
> sql(s"INSERT INTO testcat.ns.$customers VALUES " +
> s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)")
> createTable(orders, ordersColumns, Array.empty)
> sql(s"INSERT INTO testcat.ns.$orders VALUES " +
> s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50,
> 3)")
> val df = sql(
> s"""
> |${selectWithMergeJoinHint("c", "o")}
> |customer_name, customer_age, order_amount
> |FROM testcat.ns.$customers c JOIN testcat.ns.$orders o
> |ON c.customer_id = o.customer_id ORDER BY c.customer_id,
> order_amount
> |""".stripMargin)
> val shuffles = collectShuffles(df.queryExecution.executedPlan)
> assert(shuffles.length == 1)
> checkAnswer(df,
> Seq(Row("aaa", 10, 100.0), Row("aaa", 10, 200.0), Row("bbb", 20, 150.0),
> Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
> }
> } {code}
>
> {code:java}
> $ build/sbt "sql/testOnly *KeyGroupedPartitioningSuite -- -z bug"
> ...
> [info] - bug *** FAILED *** (1 second, 884 milliseconds)
> [info] java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for
> length 1
> [info] at
> scala.collection.immutable.ArraySeq$ofRef.apply(ArraySeq.scala:331)
> [info] at
> org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.$anonfun$project$1(partitioning.scala:471)
> [info] at
> org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.$anonfun$project$1$adapted(partitioning.scala:471)
> [info] at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
> [info] at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
> [info] at
> org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.project(partitioning.scala:471)
> [info] at
> org.apache.spark.sql.execution.KeyGroupedPartitionedScan.$anonfun$getOutputKeyGroupedPartitioning$5(KeyGroupedPartitionedScan.scala:58)
> ... {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]