jean-claude created SPARK-35245:
-----------------------------------

             Summary: DynamicFilter pushdown not working
                 Key: SPARK-35245
                 URL: https://issues.apache.org/jira/browse/SPARK-35245
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.1.1
            Reporter: jean-claude


 

The pushed filters is always empty. `PushedFilters: []`

I was expecting the filters to be pushed down on the probe side of the join.

Not sure how to properly configure this to work. For example how to set 
fallbackFilterRatio ?


spark = ( SparkSession.builder
 .master('local')
 .config("spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio", 100)
 .getOrCreate()
 )
 
df = 
spark.read.parquet('abfss://warehouse@<domain>/iceberg/opensource/<table>/data/timeperiod=2021-04-25/00000-0-929b48ef-7ec3-47bd-b0a1-e9172c2dca6a-00001.parquet')
df.createOrReplaceTempView('TMP')
 
spark.sql('''
explain cost
select 
   timeperiod, rrname
from
   TMP
where
   timeperiod in (
     select
       TO_DATE(d, 'MM-dd-yyyy') AS timeperiod
     from
     values
       ('01-01-2021'),
       ('01-01-2021'),
       ('01-01-2021') tbl(d)
 )
group by
   timeperiod, rrname
''').show(truncate=False)

|== Optimized Logical Plan ==
Aggregate [timeperiod#597, rrname#594], [timeperiod#597, rrname#594], 
Statistics(sizeInBytes=69.0 MiB)
+- Join LeftSemi, (timeperiod#597 = timeperiod#669), 
Statistics(sizeInBytes=69.0 MiB)
 :- Project [rrname#594, timeperiod#597], Statistics(sizeInBytes=69.0 MiB)
 : +- 
Relation[count#591,time_first#592L,time_last#593L,rrname#594,rrtype#595,rdata#596,timeperiod#597]
 parquet, Statistics(sizeInBytes=198.5 MiB)
 +- LocalRelation [timeperiod#669], Statistics(sizeInBytes=36.0 B)

== Physical Plan ==
*(2) HashAggregate(keys=[timeperiod#597, rrname#594], functions=[], 
output=[timeperiod#597, rrname#594])
+- Exchange hashpartitioning(timeperiod#597, rrname#594, 200), 
ENSURE_REQUIREMENTS, [id=#839]
 +- *(1) HashAggregate(keys=[timeperiod#597, rrname#594], functions=[], 
output=[timeperiod#597, rrname#594])
 +- *(1) BroadcastHashJoin [timeperiod#597], [timeperiod#669], LeftSemi, 
BuildRight, false
 :- *(1) ColumnarToRow
 : +- FileScan parquet [rrname#594,timeperiod#597] Batched: true, DataFilters: 
[], Format: Parquet, Location: 
InMemoryFileIndex[abfss://warehouse@<domain>/iceberg/opensource/..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<rrname:string,timeperiod:date>
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, date, 
true]),false), [id=#822]
 +- LocalTableScan [timeperiod#669]
 
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to