[ https://issues.apache.org/jira/browse/SPARK-45797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Daniel Diego Horcajuelo updated SPARK-45797: -------------------------------------------- Description: When doing certain types of transformations on a dataframe which involve window functions with filters I am getting the wrong results. Here is a minimal example of the results I get with my code: {code:java} from pyspark.sql import SparkSession import pyspark.sql.functions as f from pyspark.sql.window import Window as w from datetime import datetime, date spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", True).getOrCreate() # Base dataframe df = spark.createDataFrame( [ (1, date(2023, 10, 1), date(2023, 10, 2), "open"), (1, date(2023, 10, 2), date(2023, 10, 3), "close"), (2, date(2023, 10, 1), date(2023, 10, 2), "close"), (2, date(2023, 10, 2), date(2023, 10, 4), "close"), (3, date(2023, 10, 2), date(2023, 10, 4), "open"), (3, date(2023, 10, 3), date(2023, 10, 6), "open"), ], schema="id integer, date_start date, date_end date, status string" ) # We define two partition functions partition = w.partitionBy("id").orderBy("date_start", "date_end").rowsBetween(w.unboundedPreceding, w.unboundedFollowing) partition2 = w.partitionBy("id").orderBy("date_start", "date_end") # Define dataframe A A = df.withColumn( "date_end_of_last_close", f.max(f.when(f.col("status") == "close", f.col("date_end"))).over(partition) ).withColumn( "rank", f.row_number().over(partition2) ) display(A) | id | date_start | date_end | status | date_end_of_last_close | rank | |----|------------|------------|--------|------------------------|------| | 1 | 2023-10-01 | 2023-10-02 | open | 2023-10-03 | 1 | | 1 | 2023-10-02 | 2023-10-03 | close | 2023-10-03 | 2 | | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-04 | 1 | | 2 | 2023-10-02 | 2023-10-04 | close | 2023-10-04 | 2 | | 3 | 2023-10-02 | 2023-10-04 | open | NULL | 1 | | 3 | 2023-10-03 | 2023-10-06 | open | NULL | 2 | # When filtering by rank = 1, I get this weird result A_result = A.filter(f.col("rank") == 1).drop("rank") display(A_result) | id | date_start | date_end | status | date_end_of_last_close | |----|------------|------------|--------|------------------------| | 1 | 2023-10-01 | 2023-10-02 | open | NULL | | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-02 | | 3 | 2023-10-02 | 2023-10-04 | open | NULL | {code} I think spark engine might be managing wrongly the internal partitions. If creating the dataframe from scratch (without transformations), the filtering operation returns the right result. In pyspark 3.4.0 this error doesn't happen. For more details, please check out this same question in stackoverflow: [stackoverflow question|https://stackoverflow.com/questions/77396807/discrepancies-in-pyspark-dataframe-results-when-using-window-functions-and-filte?noredirect=1#comment136446225_77396807] I'll mark this issue as important because it affects some basic operations which are daily used was: When doing certain types of transformations on a dataframe which involve window functions with filters I am getting the wrong results. Here is a minimal example of the results I get with my code: {code:java} from pyspark.sql import SparkSession import pyspark.sql.functions as f from pyspark.sql.window import Window as w from datetime import datetime, date from chispa.dataframe_comparer import assert_df_equality spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", True).getOrCreate() # Base dataframe df = spark.createDataFrame( [ (1, date(2023, 10, 1), date(2023, 10, 2), "open"), (1, date(2023, 10, 2), date(2023, 10, 3), "close"), (2, date(2023, 10, 1), date(2023, 10, 2), "close"), (2, date(2023, 10, 2), date(2023, 10, 4), "close"), (3, date(2023, 10, 2), date(2023, 10, 4), "open"), (3, date(2023, 10, 3), date(2023, 10, 6), "open"), ], schema="id integer, date_start date, date_end date, status string" ) # We define two partition functions partition = w.partitionBy("id").orderBy("date_start", "date_end").rowsBetween(w.unboundedPreceding, w.unboundedFollowing) partition2 = w.partitionBy("id").orderBy("date_start", "date_end") # Define dataframe A A = df.withColumn( "date_end_of_last_close", f.max(f.when(f.col("status") == "close", f.col("date_end"))).over(partition) ).withColumn( "rank", f.row_number().over(partition2) ) display(A) | id | date_start | date_end | status | date_end_of_last_close | rank | |----|------------|------------|--------|------------------------|------| | 1 | 2023-10-01 | 2023-10-02 | open | 2023-10-03 | 1 | | 1 | 2023-10-02 | 2023-10-03 | close | 2023-10-03 | 2 | | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-04 | 1 | | 2 | 2023-10-02 | 2023-10-04 | close | 2023-10-04 | 2 | | 3 | 2023-10-02 | 2023-10-04 | open | NULL | 1 | | 3 | 2023-10-03 | 2023-10-06 | open | NULL | 2 | # When filtering by rank = 1, I get this weird result A_result = A.filter(f.col("rank") == 1).drop("rank") display(A_result) | id | date_start | date_end | status | date_end_of_last_close | |----|------------|------------|--------|------------------------| | 1 | 2023-10-01 | 2023-10-02 | open | NULL | | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-02 | | 3 | 2023-10-02 | 2023-10-04 | open | NULL | {code} I think spark engine might be managing wrongly the internal partitions. If creating the dataframe from scratch (without transformations), the filtering operation returns the right result. In pyspark 3.4.0 this error doesn't happen. For more details, please check out this same question in stackoverflow: [stackoverflow question|https://stackoverflow.com/questions/77396807/discrepancies-in-pyspark-dataframe-results-when-using-window-functions-and-filte?noredirect=1#comment136446225_77396807] I'll mark this issue as important because it affects some basic operations which are daily used > Discrepancies in PySpark DataFrame Results When Using Window Functions and > Filters > ---------------------------------------------------------------------------------- > > Key: SPARK-45797 > URL: https://issues.apache.org/jira/browse/SPARK-45797 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.5.0 > Environment: Python 3.10 > Pyspark 3.5.0 > Ubuntu 22.04.3 LTS > Reporter: Daniel Diego Horcajuelo > Priority: Major > Fix For: 3.5.0 > > > When doing certain types of transformations on a dataframe which involve > window functions with filters I am getting the wrong results. Here is a > minimal example of the results I get with my code: > > {code:java} > from pyspark.sql import SparkSession > import pyspark.sql.functions as f > from pyspark.sql.window import Window as w > from datetime import datetime, date > spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", > True).getOrCreate() > # Base dataframe > df = spark.createDataFrame( > [ > (1, date(2023, 10, 1), date(2023, 10, 2), "open"), > (1, date(2023, 10, 2), date(2023, 10, 3), "close"), > (2, date(2023, 10, 1), date(2023, 10, 2), "close"), > (2, date(2023, 10, 2), date(2023, 10, 4), "close"), > (3, date(2023, 10, 2), date(2023, 10, 4), "open"), > (3, date(2023, 10, 3), date(2023, 10, 6), "open"), > ], > schema="id integer, date_start date, date_end date, status string" > ) > # We define two partition functions > partition = w.partitionBy("id").orderBy("date_start", > "date_end").rowsBetween(w.unboundedPreceding, w.unboundedFollowing) > partition2 = w.partitionBy("id").orderBy("date_start", "date_end") > # Define dataframe A > A = df.withColumn( > "date_end_of_last_close", > f.max(f.when(f.col("status") == "close", > f.col("date_end"))).over(partition) > ).withColumn( > "rank", > f.row_number().over(partition2) > ) > display(A) > | id | date_start | date_end | status | date_end_of_last_close | rank | > |----|------------|------------|--------|------------------------|------| > | 1 | 2023-10-01 | 2023-10-02 | open | 2023-10-03 | 1 | > | 1 | 2023-10-02 | 2023-10-03 | close | 2023-10-03 | 2 | > | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-04 | 1 | > | 2 | 2023-10-02 | 2023-10-04 | close | 2023-10-04 | 2 | > | 3 | 2023-10-02 | 2023-10-04 | open | NULL | 1 | > | 3 | 2023-10-03 | 2023-10-06 | open | NULL | 2 | > # When filtering by rank = 1, I get this weird result > A_result = A.filter(f.col("rank") == 1).drop("rank") > display(A_result) > | id | date_start | date_end | status | date_end_of_last_close | > |----|------------|------------|--------|------------------------| > | 1 | 2023-10-01 | 2023-10-02 | open | NULL | > | 2 | 2023-10-01 | 2023-10-02 | close | 2023-10-02 | > | 3 | 2023-10-02 | 2023-10-04 | open | NULL | {code} > I think spark engine might be managing wrongly the internal partitions. If > creating the dataframe from scratch (without transformations), the filtering > operation returns the right result. In pyspark 3.4.0 this error doesn't > happen. > > For more details, please check out this same question in stackoverflow: > [stackoverflow > question|https://stackoverflow.com/questions/77396807/discrepancies-in-pyspark-dataframe-results-when-using-window-functions-and-filte?noredirect=1#comment136446225_77396807] > > I'll mark this issue as important because it affects some basic operations > which are daily used -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org