[
https://issues.apache.org/jira/browse/SPARK-43777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726261#comment-17726261
]
Kristopher Kane commented on SPARK-43777:
-----------------------------------------
Yes, a colleague pointed out last night that I had missed that in the window
spec.
> Coalescing partitions in AQE returns different results with row_number
> windows.
> --------------------------------------------------------------------------------
>
> Key: SPARK-43777
> URL: https://issues.apache.org/jira/browse/SPARK-43777
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.3, 3.3.2
> Environment: SBT based Spark project with unit tests running on
> spark-testing-base. Tested with Spark 3.1.3 and 3.3.2
> Reporter: Kristopher Kane
> Priority: Major
>
> While updating our code base from 3.1 to 3.3, I had a test fail due to wrong
> results. With 3.1, we did not proactively turn on AQE in sbt based tests and
> noticed the failure due to AQE enabled by default between 3.1 and 3.3
> An easily reproducible test:
> {code:java}
> val testDataDf = Seq(
> (1, 1, 0, 0),
> (1, 1, 0, 0),
> (1, 1, 0, 0),
> (1, 0, 0, 1),
> (1, 0, 0, 1),
> (2, 0, 0, 0),
> (2, 0, 1, 0),
> (2, 1, 0, 0),
> (3, 0, 1, 0),
> (3, 0, 1, 0),
> ).toDF("id", "is_attribute1", "is_attribute2", "is_attribute3")
> val placeWindowSpec = Window
> .partitionBy("id")
> .orderBy($"count".desc)
> val resultDf: DataFrame = testDataDf
> .select("id", "is_attribute1", "is_attribute2", "is_attribute3")
> .withColumn(
> "place",
> when($"is_attribute1" === 1, "attribute1")
> .when($"is_attribute2" === 1, "attribute2")
> .when($"is_attribute3" === 1, "attribute3")
> .otherwise("other")
> )
> .groupBy("id", "place")
> .agg(
> functions.count("*").as("count")
> )
> .withColumn(
> "rank",
> row_number().over(placeWindowSpec)
> )
> resultDf.orderBy("id", "place", "rank").show() {code}
>
> Various results based on Spark version and AQE settings:
> {code:java}
> Spark 3.1
> Without AQE
> +---+----------+-----+----+
> | id| place|count|rank|
> +---+----------+-----+----+
> | 1|attribute1| 3| 1|
> | 1|attribute3| 2| 2|
> | 2|attribute1| 1| 2|
> | 2|attribute2| 1| 1|
> | 2| other| 1| 3|
> | 3|attribute2| 2| 1|
> +---+----------+-----+----+
> AQE with defaults
> +---+----------+-----+----+
> | id| place|count|rank|
> +---+----------+-----+----+
> | 1|attribute1| 3| 1|
> | 1|attribute3| 2| 2|
> | 2|attribute1| 1| 2|
> | 2|attribute2| 1| 1|
> | 2| other| 1| 3|
> | 3|attribute2| 2| 1|
> +---+----------+-----+----+
> AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false")
> +---+----------+-----+----+
> | id| place|count|rank|
> +---+----------+-----+----+
> | 1|attribute1| 3| 1|
> | 1|attribute3| 2| 2|
> | 2|attribute1| 1| 2|
> | 2|attribute2| 1| 1|
> | 2| other| 1| 3|
> | 3|attribute2| 2| 1|
> +---+----------+-----+----+
> AQE with .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum",
> "1") - Like Spark 3.3 with AQE defaults
> +---+----------+-----+----+
> | id| place|count|rank|
> +---+----------+-----+----+
> | 1|attribute1| 3| 1|
> | 1|attribute3| 2| 2|
> | 2|attribute1| 1| 3|
> | 2|attribute2| 1| 2|
> | 2| other| 1| 1|
> | 3|attribute2| 2| 1|
> ----------------------------------------
> Spark 3.3.2
> ----------------------------------------
> AQE with defaults
> +---+----------+-----+----+
> | id| place|count|rank|
> +---+----------+-----+----+
> | 1|attribute1| 3| 1|
> | 1|attribute3| 2| 2|
> | 2|attribute1| 1| 3|
> | 2|attribute2| 1| 2|
> | 2| other| 1| 1|
> | 3|attribute2| 2| 1|
> +---+----------+-----+----+
> --------------------------------------------
> AQE with .set("spark.sql.adaptive.coalescePartitions.enabled", "false") -
> This matches Spark 3.1
> +---+----------+-----+----+
> | id| place|count|rank|
> +---+----------+-----+----+
> | 1|attribute1| 3| 1|
> | 1|attribute3| 2| 2|
> | 2|attribute1| 1| 2|
> | 2|attribute2| 1| 1|
> | 2| other| 1| 3|
> | 3|attribute2| 2| 1|
> +---+----------+-----+----+ {code}
> As you can see, the 'rank' column of row_number(partition by, order by)
> returns a different rank for id value 2's three attributes based on how AQE
> coalesces partitions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]