[
https://issues.apache.org/jira/browse/SPARK-35245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337807#comment-17337807
]
Yuming Wang commented on SPARK-35245:
-------------------------------------
This is because filtering side do not has selective predicate :
https://github.com/apache/spark/blob/19c7d2f3d8cda8d9bc5dfc1a0bf5d46845b1bc2f/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala#L193-L201
> DynamicFilter pushdown not working
> ----------------------------------
>
> Key: SPARK-35245
> URL: https://issues.apache.org/jira/browse/SPARK-35245
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.1.1
> Reporter: jean-claude
> Priority: Minor
>
>
> The pushed filters is always empty. `PushedFilters: []`
> I was expecting the filters to be pushed down on the probe side of the join.
> Not sure how to properly configure this to work. For example how to set
> fallbackFilterRatio ?
> spark = ( SparkSession.builder
> .master('local')
> .config("spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio",
> 100)
> .getOrCreate()
> )
>
> df =
> spark.read.parquet('abfss://warehouse@<domain>/iceberg/opensource/<table>/data/timeperiod=2021-04-25/00000-0-929b48ef-7ec3-47bd-b0a1-e9172c2dca6a-00001.parquet')
> df.createOrReplaceTempView('TMP')
>
> spark.sql('''
> explain cost
> select
> timeperiod, rrname
> from
> TMP
> where
> timeperiod in (
> select
> TO_DATE(d, 'MM-dd-yyyy') AS timeperiod
> from
> values
> ('01-01-2021'),
> ('01-01-2021'),
> ('01-01-2021') tbl(d)
> )
> group by
> timeperiod, rrname
> ''').show(truncate=False)
> |== Optimized Logical Plan ==
> Aggregate [timeperiod#597, rrname#594], [timeperiod#597, rrname#594],
> Statistics(sizeInBytes=69.0 MiB)
> +- Join LeftSemi, (timeperiod#597 = timeperiod#669),
> Statistics(sizeInBytes=69.0 MiB)
> :- Project [rrname#594, timeperiod#597], Statistics(sizeInBytes=69.0 MiB)
> : +-
> Relation[count#591,time_first#592L,time_last#593L,rrname#594,rrtype#595,rdata#596,timeperiod#597]
> parquet, Statistics(sizeInBytes=198.5 MiB)
> +- LocalRelation [timeperiod#669], Statistics(sizeInBytes=36.0 B)
> == Physical Plan ==
> *(2) HashAggregate(keys=[timeperiod#597, rrname#594], functions=[],
> output=[timeperiod#597, rrname#594])
> +- Exchange hashpartitioning(timeperiod#597, rrname#594, 200),
> ENSURE_REQUIREMENTS, [id=#839]
> +- *(1) HashAggregate(keys=[timeperiod#597, rrname#594], functions=[],
> output=[timeperiod#597, rrname#594])
> +- *(1) BroadcastHashJoin [timeperiod#597], [timeperiod#669], LeftSemi,
> BuildRight, false
> :- *(1) ColumnarToRow
> : +- FileScan parquet [rrname#594,timeperiod#597] Batched: true,
> DataFilters: [], Format: Parquet, Location:
> InMemoryFileIndex[abfss://warehouse@<domain>/iceberg/opensource/...,
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<rrname:string,timeperiod:date>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, date,
> true]),false), [id=#822]
> +- LocalTableScan [timeperiod#669]
>
> ```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]