[
https://issues.apache.org/jira/browse/SPARK-45543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-45543.
---------------------------------
> InferWindowGroupLimit causes bug if the other window functions haven't the
> same window frame as the rank-like functions
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-45543
> URL: https://issues.apache.org/jira/browse/SPARK-45543
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, Spark Core, SQL
> Affects Versions: 3.5.0
> Reporter: Ron Serruya
> Assignee: Jiaan Geng
> Priority: Critical
> Labels: correctness, data-loss, pull-request-available
> Fix For: 3.5.1, 4.0.0
>
>
> First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not
> very knowledgeable about spark internals, I hope I diagnosed the problem
> correctly
> I found the degradation in spark version 3.5.0:
> When using multiple windows that share the same partition and ordering (but
> with different "frame boundaries", where one window is a ranking function,
> "WindowGroupLimit" is added to the plan causing wrong values to be created
> from the other windows.
> *This behavior didn't exist in versions 3.3 and 3.4.*
> Example:
>
> {code:python}
> import pysparkfrom pyspark.sql import functions as F, Window
> df = spark.createDataFrame([
> {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020},
> {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022},
> {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023},
> {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021},
> ])
> # Create first window for row number
> window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year'))
> # Create additional window from the first window with unbounded frame
> unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding,
> Window.unboundedFollowing)
> # Try to keep the first row by year, and also collect all scores into a list
> df2 = df.withColumn(
> 'rn',
> F.row_number().over(window_spec)
> ).withColumn(
> 'all_scores',
> F.collect_list('score').over(unbound_spec)
> ){code}
> So far everything works, and if we display df2:
>
> {noformat}
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1 |3 |2023|1 |[3, 2, 1] |
> |Dave|1 |2 |2022|2 |[3, 2, 1] |
> |Dave|1 |1 |2020|3 |[3, 2, 1] |
> |Amy |2 |6 |2021|1 |[6] |
> +----+------+-----+----+---+----------+{noformat}
>
> However, once we filter to keep only the first row number:
>
> {noformat}
> df2.filter("rn=1").show(truncate=False)
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1 |3 |2023|1 |[3] |
> |Amy |2 |6 |2021|1 |[6] |
> +----+------+-----+----+---+----------+{noformat}
> As you can see just filtering changed the "all_scores" array for Dave.
> (This example uses `collect_list`, however, the same result happens with
> other functions, such as max, min, count, etc)
>
> Now, if instead of using the two windows we used, I will use the first window
> and a window with different ordering, or create a completely new window with
> same partition but no ordering, it will work fine:
> {code:python}
> new_window = Window.partitionBy('row_id',
> 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
> df3 = df.withColumn(
> 'rn',
> F.row_number().over(window_spec)
> ).withColumn(
> 'all_scores',
> F.collect_list('score').over(new_window)
> )
> df3.filter("rn=1").show(truncate=False){code}
> {noformat}
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1 |3 |2023|1 |[3, 2, 1] |
> |Amy |2 |6 |2021|1 |[6] |
> +----+------+-----+----+---+----------+
> {noformat}
> In addition, if we use all 3 windows to create 3 different columns, it will
> also work ok. So it seems the issue happens only when all the windows used
> share the same partition and ordering.
> Here is the final plan for the faulty dataframe:
> {noformat}
> df2.filter("rn=1").explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Filter (rn#9 = 1)
> +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L
> DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
> currentrow$())) AS rn#9, collect_list(score#2L, 0, 0)
> windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
> AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST]
> +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST],
> row_number(), 1, Final
> +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L
> DESC NULLS LAST], false, 0
> +- Exchange hashpartitioning(row_id#1L, name#0, 200),
> ENSURE_REQUIREMENTS, [plan_id=425]
> +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS
> LAST], row_number(), 1, Partial
> +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST,
> year#3L DESC NULLS LAST], false, 0
> +- Scan
> ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat}
> I suspect the issue is caused due to the "WindowGroupLimit" clause in the
> plan.
> This clause doesn't appear in the dataframes that work fine, and before
> filtering the rn.
> So I assume that since the optimizer detects that we want to only keep the
> first row of the ranking function, it first removes all other rows from the
> following computations, which causes the incorrect result or loss of data.
> I think the bug comes from this change (and the attached PRs):
> https://issues.apache.org/jira/browse/SPARK-44340
> It was added in spark 3.5.0, and in addition, I noticed that it was included
> in databricks release 13.3, which included spark 3.4.0, but also this fix in
> their release note. And evidently, this bug happens on databricks13 spark3.4,
> but not on my local spark 3.4
> tagging user [~beliefer] as I believe you would know most about this.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]