[ https://issues.apache.org/jira/browse/SPARK-53634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021366#comment-18021366 ]
Yuki Sato commented on SPARK-53634: ----------------------------------- (For those facing this issue) You can work around it by avoiding the use of multiple withColumn operations with literals on the DataFrame immediately after a union, as shown below. {code:java} import org.apache.spark.sql.SaveMode val publishDir = "output/csv_data" val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) val csvOutputDF1 = sampleData1.toDF("value", "path1") val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) val csvOutputDF2 = sampleData2.toDF("value", "path1") val csvUnionDF = csvOutputDF1.union(csvOutputDF2) val path2And3DF = Seq(("e", "f")).toDF("path2", "path3") csvUnionDF .join(path2And3DF) .sortWithinPartitions(col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc) .write .mode(SaveMode.Overwrite) .partitionBy("path1", "path2", "path3") .csv(publishDir) {code} This is due to the rule mentioned in the root cause explanation, which prevents it from being subject to FoldablePropagation. > Missing Sort Columns in Execution Plan Optimization with union + withColumn + > sortWithinPartition > ------------------------------------------------------------------------------------------------- > > Key: SPARK-53634 > URL: https://issues.apache.org/jira/browse/SPARK-53634 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 3.5.4, 4.0.0 > Reporter: Yuki Sato > Priority: Major > > I am reporting this as a bug, but as I cannot determine for sure, I would > like to confirm whether this behavior is intended or a bug. > *issue* > In certain logic, during the execution plan optimization process, some > columns specified in the Sort node created by SortWithinPartition are missing. > The following code snippet can easily reproduce this issue: > {code:java} > val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) > val csvOutputDF1 = sampleData1.toDF("value", "path1") > val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) > val csvOutputDF2 = sampleData2.toDF("value", "path1") > val csvUnionDF = csvOutputDF1.union(csvOutputDF2) > csvUnionDF > .withColumn("path2", lit("e")) > .withColumn("path3", lit("f")) > .sortWithinPartitions( > col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc > ) > .explain(true) {code} > > The execution plan of the above code is as follows: > {code:java} > == Parsed Logical Plan == > 'Sort ['path1 ASC NULLS FIRST, 'path2 ASC NULLS FIRST, 'path3 ASC NULLS > FIRST, 'value ASC NULLS FIRST], false > +- Project [value#42, path1#43, path2#59, f AS path3#63] > +- Project [value#42, path1#43, e AS path2#59] > +- Union false, false > :- Project [_1#37 AS value#42, _2#38 AS path1#43] > : +- LocalRelation [_1#37, _2#38] > +- Project [_1#48 AS value#53, _2#49 AS path1#54] > +- LocalRelation [_1#48, _2#49] > == Analyzed Logical Plan == > value: int, path1: string, path2: string, path3: string > Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, path3#63 ASC NULLS > FIRST, value#42 ASC NULLS FIRST], false > +- Project [value#42, path1#43, path2#59, f AS path3#63] > +- Project [value#42, path1#43, e AS path2#59] > +- Union false, false > :- Project [_1#37 AS value#42, _2#38 AS path1#43] > : +- LocalRelation [_1#37, _2#38] > +- Project [_1#48 AS value#53, _2#49 AS path1#54] > +- LocalRelation [_1#48, _2#49] > == Optimized Logical Plan == > Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS > FIRST], false > +- Union false, false > :- LocalRelation [value#42, path1#43, path2#59, path3#63] > +- LocalRelation [value#53, path1#54, path2#68, path3#69] > == Physical Plan == > *(1) Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC > NULLS FIRST], false, 0 > +- Union > :- LocalTableScan [value#42, path1#43, path2#59, path3#63] > +- LocalTableScan [value#53, path1#54, path2#68, path3#69] {code} > It can be observed that the column "path3" in Sort has already disappeared at > the Optimized Logical Plan. > This becomes a problem when using partitionBy as shown below: > {code:java} > import org.apache.spark.sql.SaveMode > val publishDir = "output/csv_data" > val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b")) > val csvOutputDF1 = sampleData1.toDF("value", "path1") > val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b")) > val csvOutputDF2 = sampleData2.toDF("value", "path1") > val csvUnionDF = csvOutputDF1.union(csvOutputDF2) > csvUnionDF > .withColumn("path2", lit("e")) > .withColumn("path3", lit("f")) > .sortWithinPartitions( > col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc > ) > .write > .mode(SaveMode.Overwrite) > .partitionBy( > "path1", "path2", "path3" > ) > .csv(publishDir) {code} > When using V1Writes, isOrderingMatch compares the outputOrdering generated by > the Sort node from sortWithinPartition with the requiredOrdering from > partitionBy. > However, since "path3" is missing, the Sort node from partitionBy is > selected, and the sort specified by sortWithinPartition is ignored, resulting > in the user-specified sort order being broken{*}.{*} > Although partitionBy itself does not guarantee that the sort order will be > preserved, from the user’s perspective this behavior appears strange. > *root cause* > The root cause in this case lies in the three optimization rules > (PushProjectionThroughUnion / FoldablePropagation / EliminateSorts) and the > order in which they are applied. > Currently, the rules are applied once each time in the order > PushProjectionThroughUnion → FoldablePropagation → EliminateSorts. > As a result, the literal column from the first withColumn in csvUnionDF is > not subject to FoldablePropagation due to the prior application of > PushProjectionThroughUnion, while literal columns added from the second > withColumn are subject to FoldablePropagation, interpreted as literals, and > then eliminated by EliminateSorts. > *affect version* > I have confirmed this in version 3.5.4 and 4.0.0, but it is assumed that all > Spark versions are affected. > However, the behavior differs around version 3.4.0, where changes to V1Writes > were introduced. > - 3.4.0 and later (default: spark.sql.optimizer.plannedWrite.enbaled=true) > In the execution plan, the Sort generated by partitionBy is selected, and > the Sort from sortWithinPartition is ignored. > - Prior to 3.4.0 (same as spark.sql.optimizer.plannedWrite.enbaled=false in > 3.4.0 and later) > A Sort generated by partitionBy is internally added to the execution plan. > As a result, in the example code above, both the Sort from > sortWithinPartition and the Sort from partitionBy are executed redundantly. > I am relatively new to Spark community, so I cannot determine whether this is > intended behavior or a bug that should be fixed. > If it is to be fixed as a bug and necessary, I will consider working on it. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org