Daniel Diego Horcajuelo created SPARK-45797:
-----------------------------------------------

             Summary: Discrepancies in PySpark DataFrame Results When Using 
Window Functions and Filters
                 Key: SPARK-45797
                 URL: https://issues.apache.org/jira/browse/SPARK-45797
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.5.0
         Environment: Python 3.10

Pyspark 3.5.0

Ubuntu 22.04.3 LTS
            Reporter: Daniel Diego Horcajuelo
             Fix For: 3.5.0


When doing certain types of transformations on a dataframe which involve window 
functions with filters I am getting the wrong results. Here is a minimal 
example of the results I get with my code:

 
{code:java}
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window as w
from datetime import datetime, date
from chispa.dataframe_comparer import assert_df_equality

spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", 
True).getOrCreate()

# Base dataframe
df = spark.createDataFrame(
    [
        (1, date(2023, 10, 1), date(2023, 10, 2), "open"),
        (1, date(2023, 10, 2), date(2023, 10, 3), "close"),
        (2, date(2023, 10, 1), date(2023, 10, 2), "close"),
        (2, date(2023, 10, 2), date(2023, 10, 4), "close"),
        (3, date(2023, 10, 2), date(2023, 10, 4), "open"),
        (3, date(2023, 10, 3), date(2023, 10, 6), "open"),
    ],
    schema="id integer, date_start date, date_end date, status string"
)

# We define two partition functions
partition = w.partitionBy("id").orderBy("date_start", 
"date_end").rowsBetween(w.unboundedPreceding, w.unboundedFollowing)
partition2 = w.partitionBy("id").orderBy("date_start", "date_end")

# Define dataframe A
A = df.withColumn(
    "date_end_of_last_close",
    f.max(f.when(f.col("status") == "close", f.col("date_end"))).over(partition)
).withColumn(
    "rank",
    f.row_number().over(partition2)
)
display(A)

| id | date_start | date_end   | status | date_end_of_last_close | rank |
|----|------------|------------|--------|------------------------|------|
| 1  | 2023-10-01 | 2023-10-02 | open   | 2023-10-03             | 1    |
| 1  | 2023-10-02 | 2023-10-03 | close  | 2023-10-03             | 2    |
| 2  | 2023-10-01 | 2023-10-02 | close  | 2023-10-04             | 1    |
| 2  | 2023-10-02 | 2023-10-04 | close  | 2023-10-04             | 2    |
| 3  | 2023-10-02 | 2023-10-04 | open   | NULL                   | 1    |
| 3  | 2023-10-03 | 2023-10-06 | open   | NULL                   | 2    |

# When filtering by rank = 1, I get this weird result
A_result = A.filter(f.col("rank") == 1).drop("rank")
display(A_result)

| id | date_start | date_end   | status | date_end_of_last_close |
|----|------------|------------|--------|------------------------|
| 1  | 2023-10-01 | 2023-10-02 | open   | NULL                   |
| 2  | 2023-10-01 | 2023-10-02 | close  | 2023-10-02             |
| 3  | 2023-10-02 | 2023-10-04 | open   | NULL                   | {code}
I think spark engine might be managing wrongly the internal partitions. If 
creating the dataframe from scratch (without transformations), the filtering 
operation returns the right result. In pyspark 3.4.0 this error doesn't happen.

 

For more details, please check out this same question in stackoverflow: 
[stackoverflow 
question|https://stackoverflow.com/questions/77396807/discrepancies-in-pyspark-dataframe-results-when-using-window-functions-and-filte?noredirect=1#comment136446225_77396807]
 

I'll mark this issue as important because it affects some basic operations 
which are daily used



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to