[ https://issues.apache.org/jira/browse/SPARK-45797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783015#comment-17783015 ]
Bruce Robbins commented on SPARK-45797: --------------------------------------- I wonder if this is the same as SPARK-45543, which had two window specs and then produced wrong answers when filtered on rank = 1. > Discrepancies in PySpark DataFrame Results When Using Window Functions and > Filters > ---------------------------------------------------------------------------------- > > Key: SPARK-45797 > URL: https://issues.apache.org/jira/browse/SPARK-45797 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.5.0 > Environment: Python 3.10 > Pyspark 3.5.0 > Ubuntu 22.04.3 LTS > Reporter: Daniel Diego Horcajuelo > Priority: Major > Fix For: 3.5.0 > > > When doing certain types of transformations on a dataframe which involve > window functions with filters I am getting the wrong results. Here is a > minimal example of the results I get with my code: > > {code:java} > from pyspark.sql import SparkSession > import pyspark.sql.functions as f > from pyspark.sql.window import Window as w > from datetime import datetime, date > spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", > True).getOrCreate() > # Base dataframe > df = spark.createDataFrame( > [ > (1, date(2023, 10, 1), date(2023, 10, 2), "open"), > (1, date(2023, 10, 2), date(2023, 10, 3), "close"), > (2, date(2023, 10, 1), date(2023, 10, 2), "close"), > (2, date(2023, 10, 2), date(2023, 10, 4), "close"), > (3, date(2023, 10, 2), date(2023, 10, 4), "open"), > (3, date(2023, 10, 3), date(2023, 10, 6), "open"), > ], > schema="id integer, date_start date, date_end date, status string" > ) > # We define two partition functions > partition = w.partitionBy("id").orderBy("date_start", > "date_end").rowsBetween(w.unboundedPreceding, w.unboundedFollowing) > partition2 = w.partitionBy("id").orderBy("date_start", "date_end") > # Define dataframe A > A = df.withColumn( > "date_end_of_last_close", > f.max(f.when(f.col("status") == "close", > f.col("date_end"))).over(partition) > ).withColumn( > "rank", > f.row_number().over(partition2) > ) > display(A) > | id | date_start | date_end | status | date_end_of_last_close | rank | > |----|------------|------------|--------|------------------------|------| > | 1 | 2023-10-01 | 2023-10-02 | open | 2023-10-03 | 1 | > | 1 | 2023-10-02 | 2023-10-03 | close | 2023-10-03 | 2 | > | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-04 | 1 | > | 2 | 2023-10-02 | 2023-10-04 | close | 2023-10-04 | 2 | > | 3 | 2023-10-02 | 2023-10-04 | open | NULL | 1 | > | 3 | 2023-10-03 | 2023-10-06 | open | NULL | 2 | > # When filtering by rank = 1, I get this weird result > A_result = A.filter(f.col("rank") == 1).drop("rank") > display(A_result) > | id | date_start | date_end | status | date_end_of_last_close | > |----|------------|------------|--------|------------------------| > | 1 | 2023-10-01 | 2023-10-02 | open | NULL | > | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-02 | > | 3 | 2023-10-02 | 2023-10-04 | open | NULL | {code} > I think spark engine might be managing wrongly the internal partitions. If > creating the dataframe from scratch (without transformations), the filtering > operation returns the right result. In pyspark 3.4.0 this error doesn't > happen. > > For more details, please check out this same question in stackoverflow: > [stackoverflow > question|https://stackoverflow.com/questions/77396807/discrepancies-in-pyspark-dataframe-results-when-using-window-functions-and-filte?noredirect=1#comment136446225_77396807] > > I'll mark this issue as important because it affects some basic operations > which are daily used -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org