[
https://issues.apache.org/jira/browse/SPARK-40588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17622620#comment-17622620
]
Enrico Minack commented on SPARK-40588:
---------------------------------------
Even with AQE enabled (pre Spark 3.4.0), the written files are sorted {*}unless
spilling occurs{*}.
The reason is that {{FileFormatWriter}} defines a {{requiredOrdering}} as:
[https://github.com/apache/spark/blob/f74867bddfbcdd4d08076db36851e88b15e66556/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L188-L189]
{code:java}
val requiredOrdering = partitionColumns ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
{code}
Where {{partitionColumns}} refers to {{.write.partitionBy}} and {{sortColumns}}
refers to {{.write.sortBy}}, so {{["year", "month", "day"]}} in your case.
It enforces that ordering if the DataFrame is not sorted accordingly. With AQE
enabled (pre Spark 3.4), {{FileFormatWriter}} does not know about the existing
ordering and introduces the sorting. This reads the sorted DataFrame (sorted by
{{["year", "month", "day", "sortCol"]}}) and sorts it by {{["year", "month",
"day"]}}. If the partition has to be spilled to RAM or disk, it round-robins
over the spills, because they are all "equal" w.r.t. {{["year", "month",
"day"]}}, as all next data in the spill files of a partition have the same
values for these columns (only {{sortCol}} differs, but that is not considered
by this sort). Hence the order is broken by spilling.
> Sorting issue with AQE turned on
> ----------------------------------
>
> Key: SPARK-40588
> URL: https://issues.apache.org/jira/browse/SPARK-40588
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.1.3
> Environment: Spark v3.1.3
> Scala v2.12.13
> Reporter: Swetha Baskaran
> Priority: Major
> Attachments: image-2022-10-16-22-05-47-159.png
>
>
> We are attempting to partition data by a few columns, sort by a particular
> _sortCol_ and write out one file per partition.
> {code:java}
> df
> .repartition(col("day"), col("month"), col("year"))
> .withColumn("partitionId",spark_partition_id)
> .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
> .sortWithinPartitions("year", "month", "day", "sortCol")
> .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
> .write
> .partitionBy("year", "month", "day")
> .parquet(path){code}
> When inspecting the results, we observe one file per partition, however we
> see an _alternating_ pattern of unsorted rows in some files.
> {code:java}
> {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344}
> {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586}
> {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345}
> {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587}
> {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346}
> {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588}
> {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347}
> {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589}
> {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348}
> {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590}
> {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}{code}
> Here is a
> [gist|https://gist.github.com/Swebask/543030748a768be92d3c0ae343d2ae89] to
> reproduce the issue.
> Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false)
> fixes the issue.
> I'm working on identifying why AQE affects the sort order. Any leads or
> thoughts would be appreciated!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]