[ 
https://issues.apache.org/jira/browse/SPARK-53634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021366#comment-18021366
 ] 

Yuki Sato commented on SPARK-53634:
-----------------------------------

(For those facing this issue)
You can work around it by avoiding the use of multiple withColumn operations 
with literals on the DataFrame immediately after a union, as shown below.
{code:java}
import org.apache.spark.sql.SaveMode

val publishDir = "output/csv_data"

val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b"))
val csvOutputDF1 = sampleData1.toDF("value", "path1")
val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b"))
val csvOutputDF2 = sampleData2.toDF("value", "path1")

val csvUnionDF = csvOutputDF1.union(csvOutputDF2)
val path2And3DF = Seq(("e", "f")).toDF("path2", "path3")

csvUnionDF
  .join(path2And3DF)
  .sortWithinPartitions(col("path1").asc, col("path2").asc, col("path3").asc, 
col("value").asc)
  .write
  .mode(SaveMode.Overwrite)
  .partitionBy("path1", "path2", "path3")
  .csv(publishDir) {code}
This is due to the rule mentioned in the root cause explanation, which prevents 
it from being subject to FoldablePropagation.

> Missing Sort Columns in Execution Plan Optimization with union + withColumn + 
> sortWithinPartition
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-53634
>                 URL: https://issues.apache.org/jira/browse/SPARK-53634
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 3.5.4, 4.0.0
>            Reporter: Yuki Sato
>            Priority: Major
>
> I am reporting this as a bug, but as I cannot determine for sure, I would 
> like to confirm whether this behavior is intended or a bug.
> *issue*
> In certain logic, during the execution plan optimization process, some 
> columns specified in the Sort node created by SortWithinPartition are missing.
> The following code snippet can easily reproduce this issue:
> {code:java}
> val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b"))
> val csvOutputDF1 = sampleData1.toDF("value", "path1")
> val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b"))
> val csvOutputDF2 = sampleData2.toDF("value", "path1")
> val csvUnionDF = csvOutputDF1.union(csvOutputDF2)
> csvUnionDF
>   .withColumn("path2", lit("e"))
>   .withColumn("path3", lit("f"))
>   .sortWithinPartitions(
>     col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc
>   )
>   .explain(true) {code}
>  
> The execution plan of the above code is as follows:
> {code:java}
> == Parsed Logical Plan ==
> 'Sort ['path1 ASC NULLS FIRST, 'path2 ASC NULLS FIRST, 'path3 ASC NULLS 
> FIRST, 'value ASC NULLS FIRST], false
> +- Project [value#42, path1#43, path2#59, f AS path3#63]
>    +- Project [value#42, path1#43, e AS path2#59]
>       +- Union false, false
>          :- Project [_1#37 AS value#42, _2#38 AS path1#43]
>          :  +- LocalRelation [_1#37, _2#38]
>          +- Project [_1#48 AS value#53, _2#49 AS path1#54]
>             +- LocalRelation [_1#48, _2#49]
> == Analyzed Logical Plan ==
> value: int, path1: string, path2: string, path3: string
> Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, path3#63 ASC NULLS 
> FIRST, value#42 ASC NULLS FIRST], false
> +- Project [value#42, path1#43, path2#59, f AS path3#63]
>    +- Project [value#42, path1#43, e AS path2#59]
>       +- Union false, false
>          :- Project [_1#37 AS value#42, _2#38 AS path1#43]
>          :  +- LocalRelation [_1#37, _2#38]
>          +- Project [_1#48 AS value#53, _2#49 AS path1#54]
>             +- LocalRelation [_1#48, _2#49]
> == Optimized Logical Plan ==
> Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS 
> FIRST], false
> +- Union false, false
>    :- LocalRelation [value#42, path1#43, path2#59, path3#63]
>    +- LocalRelation [value#53, path1#54, path2#68, path3#69]
> == Physical Plan ==
> *(1) Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC 
> NULLS FIRST], false, 0
> +- Union
>    :- LocalTableScan [value#42, path1#43, path2#59, path3#63]
>    +- LocalTableScan [value#53, path1#54, path2#68, path3#69] {code}
> It can be observed that the column "path3" in Sort has already disappeared at 
> the Optimized Logical Plan.
> This becomes a problem when using partitionBy as shown below:
> {code:java}
> import org.apache.spark.sql.SaveMode
> val publishDir = "output/csv_data"
> val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b"))
> val csvOutputDF1 = sampleData1.toDF("value", "path1")
> val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b"))
> val csvOutputDF2 = sampleData2.toDF("value", "path1")
> val csvUnionDF = csvOutputDF1.union(csvOutputDF2)
> csvUnionDF
>   .withColumn("path2", lit("e"))
>   .withColumn("path3", lit("f"))
>   .sortWithinPartitions(
>     col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc
>   )
>   .write
>   .mode(SaveMode.Overwrite)
>   .partitionBy(
>     "path1", "path2", "path3"
>   )
>   .csv(publishDir) {code}
> When using V1Writes, isOrderingMatch compares the outputOrdering generated by 
> the Sort node from sortWithinPartition with the requiredOrdering from 
> partitionBy.
> However, since "path3" is missing, the Sort node from partitionBy is 
> selected, and the sort specified by sortWithinPartition is ignored, resulting 
> in the user-specified sort order being broken{*}.{*}
> Although partitionBy itself does not guarantee that the sort order will be 
> preserved, from the user’s perspective this behavior appears strange.
> *root cause*
> The root cause in this case lies in the three optimization rules 
> (PushProjectionThroughUnion / FoldablePropagation / EliminateSorts) and the 
> order in which they are applied.
> Currently, the rules are applied once each time in the order 
> PushProjectionThroughUnion → FoldablePropagation → EliminateSorts.
> As a result, the literal column from the first withColumn in csvUnionDF is 
> not subject to FoldablePropagation due to the prior application of 
> PushProjectionThroughUnion, while literal columns added from the second 
> withColumn are subject to FoldablePropagation, interpreted as literals, and 
> then eliminated by EliminateSorts.
> *affect version*
> I have confirmed this in version 3.5.4 and 4.0.0, but it is assumed that all 
> Spark versions are affected.
> However, the behavior differs around version 3.4.0, where changes to V1Writes 
> were introduced.
>  - 3.4.0 and later (default: spark.sql.optimizer.plannedWrite.enbaled=true)
>  In the execution plan, the Sort generated by partitionBy is selected, and 
> the Sort from sortWithinPartition is ignored.
>  - Prior to 3.4.0 (same as spark.sql.optimizer.plannedWrite.enbaled=false in 
> 3.4.0 and later)
>  A Sort generated by partitionBy is internally added to the execution plan.
>  As a result, in the example code above, both the Sort from 
> sortWithinPartition and the Sort from partitionBy are executed redundantly.
> I am relatively new to Spark community, so I cannot determine whether this is 
> intended behavior or a bug that should be fixed.
> If it is to be fixed as a bug and necessary, I will consider working on it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to