[ 
https://issues.apache.org/jira/browse/SPARK-35933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shreyas Kothavade updated SPARK-35933:
--------------------------------------
    Description: 
Spark does not apply partition and pushed filters when the partition by column 
and window function partition columns are not the same. For example, in the 
code below, the data frame is created with a partition on "id". And I use the 
partitioned data frame to calculate lag which is partitioned by "name". In this 
case, the query plan shows the partitionFilters and pushed Filters as empty.
{code:java}
spark
  .createDataFrame(
    Seq(
      Person(
        1,
        "Andy",
        new Timestamp(1499955986039L),
        new Timestamp(1499955982342L)
      ),
      Person(
        2,
        "Jeff",
        new Timestamp(1499955986339L),
        new Timestamp(1499955986666L)
      )
    )
  )
 .write
  .partitionBy("id")
  .mode(SaveMode.Append)
  .parquet("spark-warehouse/people")

val dfPeople =
  spark.read.parquet("spark-warehouse/people")
dfPeople
  .select(
    $"id",
    $"name",
    lag(col("ts2"), 1).over(Window.partitionBy("name").orderBy("ts"))
  )
  .filter($"id" === 1)
  .explain()
{code}

  was:
Spark does not apply partition and pushed filters when the partition by column 
and window function partition columns are not the same. For example, in the 
code below, the data frame is created with a partition on "id". And I use the 
partitioned data frame to calculate lag which is partitioned by "name". In this 
case, the query plan shows the partitionFilters and pushed Filters as empty.
{code:java}
spark
  .createDataFrame(
    Seq(
      Person(
        1,
        "Andy",
        new Timestamp(1499955986039L),
        new Timestamp(1499955982342L)
      ),
      Person(
        2,
        "Jeff",
        new Timestamp(1499955986339L),
        new Timestamp(1499955986666L)
      )
    )
  )
  .write
  .partitionBy("id")
  .mode(SaveMode.Append)
  .parquet("spark-warehouse/people")

val dfPeople =
  spark.read.parquet("spark-warehouse/people")
dfPeople
  .select(
    $"id",
    $"name",
    lag(col("ts2"), 1).over(Window.partitionBy("name").orderBy("ts"))
  )
  .filter($"name" === "Jeff")
  .explain()
{code}


> PartitionFilters and pushFilters not applied to window functions
> ----------------------------------------------------------------
>
>                 Key: SPARK-35933
>                 URL: https://issues.apache.org/jira/browse/SPARK-35933
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Shreyas Kothavade
>            Priority: Major
>
> Spark does not apply partition and pushed filters when the partition by 
> column and window function partition columns are not the same. For example, 
> in the code below, the data frame is created with a partition on "id". And I 
> use the partitioned data frame to calculate lag which is partitioned by 
> "name". In this case, the query plan shows the partitionFilters and pushed 
> Filters as empty.
> {code:java}
> spark
>   .createDataFrame(
>     Seq(
>       Person(
>         1,
>         "Andy",
>         new Timestamp(1499955986039L),
>         new Timestamp(1499955982342L)
>       ),
>       Person(
>         2,
>         "Jeff",
>         new Timestamp(1499955986339L),
>         new Timestamp(1499955986666L)
>       )
>     )
>   )
>  .write
>   .partitionBy("id")
>   .mode(SaveMode.Append)
>   .parquet("spark-warehouse/people")
> val dfPeople =
>   spark.read.parquet("spark-warehouse/people")
> dfPeople
>   .select(
>     $"id",
>     $"name",
>     lag(col("ts2"), 1).over(Window.partitionBy("name").orderBy("ts"))
>   )
>   .filter($"id" === 1)
>   .explain()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to