[ 
https://issues.apache.org/jira/browse/SPARK-35933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386086#comment-17386086
 ] 

Saurabh Chawla commented on SPARK-35933:
----------------------------------------

This is not the bug,  its the actual functionality to push down filter in case 
of window

 

 
{code:java}
// Push [[Filter]] operators through [[Window]] operators. Parts of the 
predicate that can be
// pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions 
can be compound.
// 2. Deterministic.
// 3. Placed before any non-deterministic predicates.
{code}
 

Here in this case 

1 ) All the expressions are part of window partitioning key. The expressions 
can be compound.
"id" === 1 is not the part of Window.partitionBy("name").orderBy("ts")
check this PR for reference  [https://github.com/apache/spark/pull/11635]

Check this Unit test  
https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1075
 

 

 

> PartitionFilters and pushFilters not applied to window functions
> ----------------------------------------------------------------
>
>                 Key: SPARK-35933
>                 URL: https://issues.apache.org/jira/browse/SPARK-35933
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.8, 3.1.2
>            Reporter: Shreyas Kothavade
>            Priority: Major
>
> Spark does not apply partition and pushed filters when the partition by 
> column and window function partition columns are not the same. For example, 
> in the code below, the data frame is created with a partition on "id". And I 
> use the partitioned data frame to calculate lag which is partitioned by 
> "name". In this case, the query plan shows the partitionFilters and pushed 
> Filters as empty.
> {code:java}
> spark
>   .createDataFrame(
>     Seq(
>       Person(
>         1,
>         "Andy",
>         new Timestamp(1499955986039L),
>         new Timestamp(1499955982342L)
>       ),
>       Person(
>         2,
>         "Jeff",
>         new Timestamp(1499955986339L),
>         new Timestamp(1499955986666L)
>       )
>     )
>   )
>  .write
>   .partitionBy("id")
>   .mode(SaveMode.Append)
>   .parquet("spark-warehouse/people")
> val dfPeople =
>   spark.read.parquet("spark-warehouse/people")
> dfPeople
>   .select(
>     $"id",
>     $"name",
>     lag(col("ts2"), 1).over(Window.partitionBy("name").orderBy("ts"))
>   )
>   .filter($"id" === 1)
>   .explain()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to