Swetha Baskaran created SPARK-40588:
---------------------------------------
Summary: Sorting issue with AQE turned on
Key: SPARK-40588
URL: https://issues.apache.org/jira/browse/SPARK-40588
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 3.1.3
Environment: Spark v3.1.3
Scala v2.12.13
Reporter: Swetha Baskaran
We are attempting to partition data by a few columns, sort by a particular
_sortCol_ and write out one file per partition.
{code:java}
df
.repartition(col("day"), col("month"), col("year"))
.withColumn("partitionId",spark_partition_id)
.withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
.sortWithinPartitions("year", "month", "day", "sortCol")
.withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
.write
.partitionBy("year", "month", "day")
.parquet(path){code}
When inspecting the results, we observe one file per partition, however we see
an _alternating_ pattern of unsorted rows in some files.
{code:java}
{"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344}
{"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586}
{"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345}
{"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587}
{"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346}
{"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588}
{"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347}
{"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589}
{"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348}
{"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590}
{"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}{code}
Here is a
[gist|https://gist.github.com/Swebask/543030748a768be92d3c0ae343d2ae89] to
reproduce the issue.
Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false) fixes
the issue.
I'm working on identifying why AQE affects the sort order. Any leads or
thoughts would be appreciated!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]