Hey Ryan,

I found the root cause of the post scan filter not working over Iceberg
format.

*The short explanation: *Iceberg  Parquet reader fails to scan rows when
using complex column filter(s). Iceberg Parquet reader doesn't return any
rows for the post scan filter to further inspect with post scan filters.

*More Detail: *
Although complex filter isn't pushed down by Spark to Iceberg scan, it does
push down an implicit isNotNull(mapCol) filter. Before scanning begins row
groups are evaluated to check if they can be skipped *[1]*. While doing so
ParquetMetricsRowGroupFilter rejects rowgroups on evaluation of this
isNotNull(mapCol) filter. ParquetMetricsRowGroupFilter implements a
BoundExpressionVisitor wherein it's notNull() expression evaluation method
doesn't recognize complex types as being "present" *[2]*, hence leading the
reader to believe that column is not present and is all nulls.

*In the map filter case: *
The PMRGFilter keeps a `valueCounts` metric, which keeps a count statistic
by column id. This doesn't contain counts for map column but instead has
value counts for the map-keys and map-values ( which have different unique
ids). So a lookup for the map column id fails to return any counts.

Proposed fix options:
1 - Reject handling the implicit isNotNull(mapCol) check in Parquet Reader
for nested types as we know nested types are not pushed down.
2 - We can just skip the stats based check for Nested Types as we know they
need to be re-evaluated by post scan filters anyways.

Let me know what you think,

Cheers,
-Gautam.

[1] -
https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetReader.java#L103-L112
[2] -
https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetricsRowGroupFilter.java#L159-L163


On Wed, Feb 20, 2019 at 12:25 AM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> Hi Gautam,
>
> Thanks for reporting this. I'll look into why Spark is filtering out all
> of the Iceberg records. It should use the same filter in both cases, so I'm
> surprised that this is happening.
>
> The problem with the complex predicate is that it is inside a map. Parquet
> doesn't support push-down filters for nested columns.
>
> On Mon, Feb 18, 2019 at 11:03 PM Gautam <gautamkows...@gmail.com> wrote:
>
>> Hello Iceberg Devs,
>>                I'v been tracking an issue with predicate pushdowns in
>> Iceberg on complex types. I have compared vanilla Spark reader over Parquet
>> vs. Iceberg format reader.  I have an example detailing it here:
>> https://github.com/apache/incubator-iceberg/issues/99
>>
>> *Vanilla Spark Parquet reader plan*
>> == Physical Plan ==
>> *(1) Project [age#428, name#429, friends#430, location#431]
>> +- *(1) Filter (isnotnull(friends#430) && (friends#430[Josh] = 10))
>>    +- *(1) FileScan parquet [age#428,name#429,friends#430,location#431]
>> Batched: false, Format: Parquet, Location:
>> InMemoryFileIndex[file:/usr/local/spark/test/parquet-people-complex],
>> PartitionFilters: [], PushedFilters: [IsNotNull(friends)], ReadSchema:
>> struct<age:int,name:string,friends:map<string,int>,location:struct<lat:int,lon:int>>
>>
>> * Iceberg Plan*
>> == Physical Plan ==
>> *(1) Project [age#33, name#34, friends#35]
>> +- *(1) Filter ((friends#35[Josh] = 10) && isnotnull(friends#35))
>>    +- *(1) ScanV2 iceberg[age#33, name#34, friends#35] (Filters:
>> [isnotnull(friends#35)], Options: [path=iceberg-people-complex2,paths=[]])
>>
>>
>> *Couple of points :*
>> 1)  Complex predicate is not passed down to the Scan level in both plans.
>> The complex predicate is termed "non-translateable" by
>> *DataSourceStrategy.translateFilter()  *[1] when trying to convert
>> Catalyst expression to data source filter. Ryan & Xabriel had a discussion
>> earlier on this list about Spark not passing expressions to data source (in
>> certain cases). This might be related to that. Maybe a path forward is to
>> fix that translation in Spark so that Iceberg Filter conversion has a
>> chance to handle complex type. Currently Iceberg Reader code is unaware of
>> that filter.
>>
>> 2) Although both vanilla Spark and Iceberg handle complex type predicates
>> post scan, this regression is caused by post scan filtering not returning
>> results in the Iceberg case. I think post scan filtering is unable to
>> handle Iceberg format. So if 1) is not the way forward then the alternative
>> way is to fix this in the post scan filtering.
>>
>>
>> Looking forward to your guidance on the way forward.
>>
>> Cheers,
>> -Gautam.
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L450
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to