[ https://issues.apache.org/jira/browse/SPARK-40588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17622620#comment-17622620 ]
Enrico Minack commented on SPARK-40588: --------------------------------------- Even with AQE enabled (pre Spark 3.4.0), the written files are sorted {*}unless spilling occurs{*}. The reason is that {{FileFormatWriter}} defines a {{requiredOrdering}} as: [https://github.com/apache/spark/blob/f74867bddfbcdd4d08076db36851e88b15e66556/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L188-L189] {code:java} val requiredOrdering = partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns {code} Where {{partitionColumns}} refers to {{.write.partitionBy}} and {{sortColumns}} refers to {{.write.sortBy}}, so {{["year", "month", "day"]}} in your case. It enforces that ordering if the DataFrame is not sorted accordingly. With AQE enabled (pre Spark 3.4), {{FileFormatWriter}} does not know about the existing ordering and introduces the sorting. This reads the sorted DataFrame (sorted by {{["year", "month", "day", "sortCol"]}}) and sorts it by {{["year", "month", "day"]}}. If the partition has to be spilled to RAM or disk, it round-robins over the spills, because they are all "equal" w.r.t. {{["year", "month", "day"]}}, as all next data in the spill files of a partition have the same values for these columns (only {{sortCol}} differs, but that is not considered by this sort). Hence the order is broken by spilling. > Sorting issue with AQE turned on > ---------------------------------- > > Key: SPARK-40588 > URL: https://issues.apache.org/jira/browse/SPARK-40588 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.1.3 > Environment: Spark v3.1.3 > Scala v2.12.13 > Reporter: Swetha Baskaran > Priority: Major > Attachments: image-2022-10-16-22-05-47-159.png > > > We are attempting to partition data by a few columns, sort by a particular > _sortCol_ and write out one file per partition. > {code:java} > df > .repartition(col("day"), col("month"), col("year")) > .withColumn("partitionId",spark_partition_id) > .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId) > .sortWithinPartitions("year", "month", "day", "sortCol") > .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId) > .write > .partitionBy("year", "month", "day") > .parquet(path){code} > When inspecting the results, we observe one file per partition, however we > see an _alternating_ pattern of unsorted rows in some files. > {code:java} > {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586} > {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587} > {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588} > {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589} > {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590} > {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}{code} > Here is a > [gist|https://gist.github.com/Swebask/543030748a768be92d3c0ae343d2ae89] to > reproduce the issue. > Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false) > fixes the issue. > I'm working on identifying why AQE affects the sort order. Any leads or > thoughts would be appreciated! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org