Arttu Voutilainen created SPARK-47306:
-----------------------------------------
Summary: Explicit repartitioning is ignored when reading bucketed
table but with bucketing disabled
Key: SPARK-47306
URL: https://issues.apache.org/jira/browse/SPARK-47306
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.5.1, 3.4.2
Reporter: Arttu Voutilainen
When reading a bucketed table with
`spark.sql.sources.bucketing.autoBucketedScan.enabled = true` (default), Spark
may decide to not use the bucketing during the read. However it seems like some
code may still optimize e.g. an exchange away because there is bucketing,
making the exchange "unnecessary".
Maybe one optimization rule first removes the Exchange thinking that the
bucketing provides the same distribution, and then after the
DisableUnnecessaryBucketedScan rule disables the bucketing as there is no more
Exchange?
See the below for repro, tested on 3.5.1, 3.5.0 and 3.4.2:
{noformat}
val df_no_buckets = spark.range(10)
df_no_buckets.write.bucketBy(4, "id").saveAsTable("df_buckets")
val df_buckets = spark.read.table("df_buckets")
# Repartitioning the non-bucketed table works as expected
scala> df_no_buckets.repartition(4, col("id")).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(id#0L, 4), REPARTITION_BY_NUM, [plan_id=24]
+- Range (0, 10, step=1, splits=12)
scala> df_no_buckets.repartition(4, col("id")).rdd.getNumPartitions
res2: Int = 4
# But repartition is lost when reading a bucketed table, leading to a wrong
number of partitions in the end
scala> df_buckets.repartition(4, col("id")).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FileScan parquet spark_catalog.default.df_buckets[id#3L] Batched: true,
Bucketed: false (disabled by query planner), DataFilters: [], Format: Parquet,
Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:bigint>
scala> df_buckets.repartition(4, col("id")).rdd.getNumPartitions
res4: Int = 10
# After disabling auto bucketed scan, the repartition stays away but output
partitioning is correct as the read keeps the bucketing
scala> spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled",
false)
scala> df_buckets.repartition(4, col("id")).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FileScan parquet spark_catalog.default.df_buckets[id#3L] Batched: true,
Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<id:bigint>, SelectedBucketsCount: 4 out of 4
scala> df_buckets.repartition(4, col("id")).rdd.getNumPartitions
res7: Int = 4{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]