Cheng Pan created SPARK-55411:
---------------------------------
Summary: SPJ may throw ArrayIndexOutOfBoundsException when join
keys are less than cluster keys
Key: SPARK-55411
URL: https://issues.apache.org/jira/browse/SPARK-55411
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 4.1.1, 4.0.2
Reporter: Cheng Pan
less join keys than partition keys, plus one side shuffle
{code:java}
test("bug") {
withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
"false",
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
"true") {
val customers_partitions = Array(identity("customer_name"), bucket(4,
"customer_id"))
createTable(customers, customersColumns, customers_partitions)
sql(s"INSERT INTO testcat.ns.$customers VALUES " +
s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)")
createTable(orders, ordersColumns, Array.empty)
sql(s"INSERT INTO testcat.ns.$orders VALUES " +
s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50,
3)")
val df = sql(
s"""
|${selectWithMergeJoinHint("c", "o")}
|customer_name, customer_age, order_amount
|FROM testcat.ns.$customers c JOIN testcat.ns.$orders o
|ON c.customer_id = o.customer_id ORDER BY c.customer_id, order_amount
|""".stripMargin)
val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(shuffles.length == 1)
checkAnswer(df,
Seq(Row("aaa", 10, 100.0), Row("aaa", 10, 200.0), Row("bbb", 20, 150.0),
Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
}
} {code}
{code:java}
$ build/sbt "sql/testOnly *KeyGroupedPartitioningSuite -- -z bug"
...
[info] - bug *** FAILED *** (1 second, 884 milliseconds)
[info] java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for
length 1
[info] at scala.collection.immutable.ArraySeq$ofRef.apply(ArraySeq.scala:331)
[info] at
org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.$anonfun$project$1(partitioning.scala:471)
[info] at
org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.$anonfun$project$1$adapted(partitioning.scala:471)
[info] at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
[info] at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
[info] at
org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning$.project(partitioning.scala:471)
[info] at
org.apache.spark.sql.execution.KeyGroupedPartitionedScan.$anonfun$getOutputKeyGroupedPartitioning$5(KeyGroupedPartitionedScan.scala:58)
... {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]