Arttu Voutilainen created SPARK-47306:
-----------------------------------------

             Summary: Explicit repartitioning is ignored when reading bucketed 
table but with bucketing disabled
                 Key: SPARK-47306
                 URL: https://issues.apache.org/jira/browse/SPARK-47306
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.1, 3.4.2
            Reporter: Arttu Voutilainen


When reading a bucketed table with 
`spark.sql.sources.bucketing.autoBucketedScan.enabled = true` (default), Spark 
may decide to not use the bucketing during the read. However it seems like some 
code may still optimize e.g. an exchange away because there is bucketing, 
making the exchange "unnecessary".

Maybe one optimization rule first removes the Exchange thinking that the 
bucketing provides the same distribution, and then after the 
DisableUnnecessaryBucketedScan rule disables the bucketing as there is no more 
Exchange?

See the below for repro, tested on 3.5.1, 3.5.0 and 3.4.2:
{noformat}
val df_no_buckets = spark.range(10)
df_no_buckets.write.bucketBy(4, "id").saveAsTable("df_buckets")
val df_buckets = spark.read.table("df_buckets")

# Repartitioning the non-bucketed table works as expected
scala> df_no_buckets.repartition(4, col("id")).explain() 
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(id#0L, 4), REPARTITION_BY_NUM, [plan_id=24]
   +- Range (0, 10, step=1, splits=12)


scala> df_no_buckets.repartition(4, col("id")).rdd.getNumPartitions
res2: Int = 4

# But repartition is lost when reading a bucketed table, leading to a wrong 
number of partitions in the end
scala> df_buckets.repartition(4, col("id")).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FileScan parquet spark_catalog.default.df_buckets[id#3L] Batched: true, 
Bucketed: false (disabled by query planner), DataFilters: [], Format: Parquet, 
Location: InMemoryFileIndex(1 paths)[file:/..., PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct<id:bigint>

scala> df_buckets.repartition(4, col("id")).rdd.getNumPartitions
res4: Int = 10 

# After disabling auto bucketed scan, the repartition stays away but output 
partitioning is correct as the read keeps the bucketing
scala> spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", 
false)

scala> df_buckets.repartition(4, col("id")).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FileScan parquet spark_catalog.default.df_buckets[id#3L] Batched: true, 
Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<id:bigint>, SelectedBucketsCount: 4 out of 4

scala> df_buckets.repartition(4, col("id")).rdd.getNumPartitions
res7: Int = 4{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to