Koray Beyaz created SPARK-49473:
-----------------------------------

             Summary: Performance improvement for window rangeBetween then 
Filter queries
                 Key: SPARK-49473
                 URL: https://issues.apache.org/jira/browse/SPARK-49473
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 4.0.0
            Reporter: Koray Beyaz


There is a performance improvement opportunity for a special query where 
rangeBetween window function is followed by a filter expression.

 

Consider the following query:

 
{code:java}
import org.apache.spark.sql.expressions.Window
val df = spark.range(10).withColumnRenamed("id", 
"day_index").withColumn("sales", lit(10))
df.write.parquet("sales")
 
val w = Window.partitionBy().orderBy("day_index").rangeBetween(-1, 0)
val res = spark.read.parquet("sales").withColumn("moving_average_sales", 
mean("sales").over(w)).filter("day_index = 9")
res.explain
{code}
 
{code:java}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(day_index#74L) AND (day_index#74L = 9))
   +- Window [avg(sales#75) windowspecdefinition(day_index#74L ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, -1, currentrow$())) AS 
moving_average_sales#78], [day_index#74L ASC NULLS FIRST]
      +- Sort [day_index#74L ASC NULLS FIRST], false, 0
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=185]
            +- FileScan parquet [day_index#74L,sales#75] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/home/koray/Desktop/libs/spark/sales], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct<day_index:bigint,sales:int>{code}
 

In this case, we have filtered for day_index = 9. Since the window is 
rangeBetween(-1, 0), we can filter the input dataframe for only day_index IS IN 
(8, 9). In general, Window orderBy column and the filter column should be the 
same for this to work. 

 

This is especially useful for calculating lags, moving averages for specific 
dates (today for example).

 

Do you think it is possible/doable to inject the filter
{code:java}
day_index IS IN (8, 9) {code}
so that the filter will be pushed down? Is this something we can do with 
Catalyst with a reasonable effort?

 

[~gurwls223] [~cloud_fan] tagging to get your opinion on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to