Sounds good, Gautam. 

Our intention was to be able to filter out files using predicates on nested 
fields. For now, file skipping works only with predicates that involve top 
level attributes.


> On 5 Mar 2019, at 17:47, Gautam <gautamkows...@gmail.com> wrote:
> 
> Hey Anton, 
>        I'm curious how you are using the Struct metrics in your company, are 
> you planning to use it for predicate pushdowns or something else entirely? 
> 
> Regarding timeline, that's fine, we can wait a week or two for your changes 
> on collecting metrics. If I can assume that your changes will add the struct 
> metrics, I could open a separate Iceberg issue about the struct expression 
> handling. If Ryan and you agree on allowing struct based filtering in Iceberg 
> as long as we avoid mixed filtering (map<struct<int>> , array<struct<Int>> , 
> etc.) I can go ahead and work on it. 
> 
> Cheers,
> -Gautam.
> 
> 
> 
> On Tue, Mar 5, 2019 at 10:30 PM Anton Okolnychyi 
> <aokolnyc...@apple.com.invalid> wrote:
> Sorry for my late reply and thanks for testing Gautam!
> 
> I had a local prototype that only collected metrics for nested structs and 
> stored them. I haven’t checked if Iceberg can make use of that right now. As 
> I understand Ryan’s comment and Gautam’s observations, we will need changes 
> to make it work even if we have proper min/max statistics. So, we have two 
> independent issues then. I was planning to add tests and submit the 
> collection upstream. However, open source approval within my company might 
> easily take another week or more. So, if we need this change earlier, someone 
> can implement it. Just let me know, I can help to review then.
> 
> Thanks,
> Anton
> 
> 
>> On 5 Mar 2019, at 09:51, Gautam <gautamkows...@gmail.com 
>> <mailto:gautamkows...@gmail.com>> wrote:
>> 
>> Thanks for the response Ryan, comments in line ...
>> 
>> > Iceberg doesn't support binding expressions in sub-structs yet. So the fix 
>> > on the Iceberg side requires a few steps. First, collecting the metrics 
>> > from Parquet with Anton's PR, and second, updating expression binding to 
>> > work with structs.
>> 
>> I don't think there is a PR up yet on collecting metrics on struct fields, I 
>> could work on one if Anton isn't already on it (thanks for calling it out in 
>> the issue Anton!).
>> 
>> > The reason why binding doesn't work with structs yet it that we don't want 
>> > to bind structs that are within maps or arrays because those will change 
>> > the semantics of the expression. For example, a.b = 5 can be run on a: 
>> > struct<b: int> but can't be run on a: list<struct<b: int>>.
>> 
>> From the discussion on said issue [1] seems like we are ok with structs 
>> being filtered on. About structs inside maps or arrays, can we not reject 
>> the invalid cases in the expression evaluation?  As in, detect of what 
>> nested type field 'a' is and allow or disallow appropriately? Having support 
>> for just structs is a good incremental feature methinks. Especially coz, as 
>> Anton pointed out,  Spark has a PR up [2] on pushing down struct-based 
>> filters which one can cherry pick locally.
>> 
>> > Also, the Avro problem wasn't because the manifests are stored as Avro. 
>> > Avro doesn't collect metrics about the data that is stored, but the 
>> > manifests have the metrics that were added with each file, so the problem 
>> > is not adding the metrics when you added the files. I think you've solved 
>> > the problem and correctly built your table metadata using the metrics from 
>> > the Parquet footers, but I still want to note the distinction: Avro 
>> > manifests store metrics correctly. Avro data files don't generate metrics.
>> 
>> Gotcha!
>> 
>> Cheers,
>> -Gautam.
>> 
>> [1] - https://github.com/apache/incubator-iceberg/issues/78 
>> <https://github.com/apache/incubator-iceberg/issues/78>
>> [2] - https://github.com/apache/spark/pull/22573 
>> <https://github.com/apache/spark/pull/22573>
>> 
>> 
>> On Sat, Mar 2, 2019 at 6:47 AM Ryan Blue <rb...@netflix.com.invalid 
>> <mailto:rb...@netflix.com.invalid>> wrote:
>> Iceberg doesn't support binding expressions in sub-structs yet. So the fix 
>> on the Iceberg side requires a few steps. First, collecting the metrics from 
>> Parquet with Anton's PR, and second, updating expression binding to work 
>> with structs.
>> 
>> The reason why binding doesn't work with structs yet it that we don't want 
>> to bind structs that are within maps or arrays because those will change the 
>> semantics of the expression. For example, a.b = 5 can be run on a: struct<b: 
>> int> but can't be run on a: list<struct<b: int>>.
>> 
>> Also, the Avro problem wasn't because the manifests are stored as Avro. Avro 
>> doesn't collect metrics about the data that is stored, but the manifests 
>> have the metrics that were added with each file, so the problem is not 
>> adding the metrics when you added the files. I think you've solved the 
>> problem and correctly built your table metadata using the metrics from the 
>> Parquet footers, but I still want to note the distinction: Avro manifests 
>> store metrics correctly. Avro data files don't generate metrics.
>> 
>> On Thu, Feb 28, 2019 at 1:32 AM Gautam <gautamkows...@gmail.com 
>> <mailto:gautamkows...@gmail.com>> wrote:
>> Hey Anton,
>>       Wanted to circle back on the Spark PR [1] to add support for nested 
>> fields .. I tried applying it, tested it. With this change Spark pushes 
>> filters on structs down to Iceberg, but Iceberg expression handling seems to 
>> fail in validation ..
>> 
>> 
>> Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot find 
>> field 'location.lat' in struct: struct<1: age: optional int, 2: name: 
>> optional string, 3: friends: optional map<string, int>, 4: location: 
>> optional struct<7: lat: optional double, 8: lon: optional double>>
>>   at 
>> com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
>>   at 
>> com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
>>   at 
>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
>>   at 
>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
>>   at 
>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
>>   at 
>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
>>   at 
>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
>>   at 
>> com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57)
>>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
>>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
>>   at 
>> com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>>   at 
>> com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>>   at 
>> com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>>   at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>  
>> 
>> I think this should be handled in Iceberg as  struct filters like a.b.c = 
>> "blah" is a legit way to query in SQL. If you feel this is a valid 
>> assumption I can work on a fix.   Thoughts? 
>> 
>> 
>> Test Table Schema:
>> scala> iceDf.printSchema
>> root
>>  |-- age: integer (nullable = true)
>>  |-- name: string (nullable = true)
>>  |-- friends: map (nullable = true)
>>  |    |-- key: string
>>  |    |-- value: integer (valueContainsNull = true)
>>  |-- location: struct (nullable = true)
>>  |    |-- lat: double (nullable = true)
>>  |    |-- lon: double (nullable = true)
>> 
>> 
>> Gist to recreate issue:
>> https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac 
>> <https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac>
>> 
>> 
>> Cheers,
>> -Gautam.
>> 
>> 
>> 
>> [1] -  https://github.com/apache/spark/pull/22573 
>> <https://github.com/apache/spark/pull/22573>
>> On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi <aokolnyc...@apple.com 
>> <mailto:aokolnyc...@apple.com>> wrote:
>> Unfortunately, Spark doesn’t push down filters for nested columns. I 
>> remember an effort to implement it [1]. However, it is not merged.
>> So, even if we have proper statistics in Iceberg, we cannot leverage it from 
>> Spark.
>> 
>> [1] - https://github.com/apache/spark/pull/22573 
>> <https://github.com/apache/spark/pull/22573>
>> 
>> 
>>> On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com 
>>> <mailto:gautamkows...@gmail.com>> wrote:
>>> 
>>> Thanks Anton, this is very helpful!  I will apply the patch from pull#63 
>>> and give it a shot.
>>> 
>>> Re: Collecting min/max stas on nested structures ( 
>>> https://github.com/apache/incubator-iceberg/issues/78 
>>> <https://github.com/apache/incubator-iceberg/issues/78> ) ...
>>> 
>>> We have the exact same use case for skipping files on nested field filters. 
>>> I was intrigued by your comment on enabling stats on nested structures by 
>>> replacing `fileSchema.asStruct().field(fieldId)` with 
>>> `fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have 
>>> you had success with this? If so, I can try it out on our data as well. 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <aokolnyc...@apple.com 
>>> <mailto:aokolnyc...@apple.com>> wrote:
>>> Hi Gautam,
>>> 
>>> I believe you see this behaviour because SparkAppenderFactory is configured 
>>> to use ParquetWriteAdapter. It only tracks the number of records and uses 
>>> ParquetWriteSupport from Spark. This means that the statistics is not 
>>> collected on writes and cannot be used on reads.
>>> 
>>> Once [1] is merged, proper statistics will be fetched from the footer and 
>>> persisted in the manifests. The statistics is collected when writing data 
>>> files not manifests. See [2] for more info. Also, [3] contains an example 
>>> that filters out files (it requires [1] to be cherry-picked locally).
>>> 
>>> Hope that helps,
>>> Anton
>>> 
>>> [1] - https://github.com/apache/incubator-iceberg/pull/63 
>>> <https://github.com/apache/incubator-iceberg/pull/63>
>>> [2] - 
>>> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
>>>  
>>> <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java>
>>> [3] - https://github.com/apache/incubator-iceberg/pull/105 
>>> <https://github.com/apache/incubator-iceberg/pull/105>
>>> 
>>> 
>>>> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com 
>>>> <mailto:gautamkows...@gmail.com>> wrote:
>>>> 
>>>> .. Just to be clear my concern is around Iceberg not skipping files. 
>>>> Iceberg does skip rowGroups when scanning files as 
>>>> iceberg.parquet.ParquetReader uses the parquet stats under it while 
>>>> skipping, albeit none of these stats come from the manifests.
>>>> 
>>>> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com 
>>>> <mailto:gautamkows...@gmail.com>> wrote:
>>>> Hello Devs,
>>>>                    I am looking into leveraging Iceberg to speed up split 
>>>> generation and to minimize file scans. My understanding was that Iceberg 
>>>> keeps key statistics as listed under Metrics.java [1] viz. column 
>>>> lower/upper bounds, nullValues, distinct value counts, etc. and that table 
>>>> scanning leverages these to skip partitions, files & row-groups (in the 
>>>> Parquet context). 
>>>> 
>>>> What I found is files aren't skipped when a predicate applies only to a 
>>>> subset of the table's files. Within a partition it will scan all files as 
>>>> manifests only keep record counts but the rest of the metrics (lower, 
>>>> upper, distinct value counts, null values) are null / empty. This is coz 
>>>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently 
>>>> that is the only appender supported for writing manifest files.
>>>> 
>>>> 
>>>> Example :
>>>> 
>>>> In following example iceTable was generated by iteratively adding two 
>>>> files so it has two separate parquet files under it ..
>>>> 
>>>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))
>>>> 
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>>  partition_data=PartitionData{}, residual=true}
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>>  partition_data=PartitionData{}, residual=true}
>>>> 
>>>> 
>>>> Only one file contains row with age = null .. 
>>>> 
>>>> scala> iceDf.show()
>>>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task 
>>>> of very large size (113 KB). The maximum recommended task size is 100 KB.
>>>> +----+-------+--------------------+
>>>> | age|   name|             friends|
>>>> +----+-------+--------------------+
>>>> |  60| Kannan|      [Justin -> 19]|
>>>> |  75| Sharon|[Michael -> 30, J...|
>>>> |null|Michael|                null|
>>>> |  30|   Andy|[Josh -> 10, Bisw...|
>>>> |  19| Justin|[Kannan -> 75, Sa...|
>>>> +----+-------+--------------------+
>>>> 
>>>> 
>>>> 
>>>> Running filter on isNull(age) scans both files .. 
>>>> 
>>>> val isNullExp = Expressions.isNull("age")
>>>> val isNullScan = iceTable.newScan().filter(isNullExp)
>>>> 
>>>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>>>> 
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>>  partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>>  partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>> 
>>>> 
>>>> 
>>>> I would expect only one file to be scanned as Iceberg should track 
>>>> nullValueCounts as per Metrics.java [1] .. The same issue holds for 
>>>> integer comparison filters scanning too many files. 
>>>> 
>>>> When I looked through the code, there is provision for using Parquet file 
>>>> footer stats to populate Manifest Metrics [3] but this is never used as 
>>>> Iceberg currently only allows AvroFileAppender for creating manifest 
>>>> files. 
>>>> 
>>>> What's the plan around using Parquet footer stats in manifests which can 
>>>> be very useful during split generation? I saw some discussions around this 
>>>> in the Iceberg Spec document [4] but couldn't glean if any of those are 
>>>> actually implemented yet. 
>>>> 
>>>> I can work on a proposal PR for adding these in but wanted to  know the 
>>>> current thoughts around this. 
>>>> 
>>>> 
>>>> Gist for above example : 
>>>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7 
>>>> <https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7>
>>>> 
>>>> 
>>>> Looking forward to your feedback,
>>>> 
>>>> Cheers,
>>>> -Gautam.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> [1] - 
>>>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
>>>>  
>>>> <https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java>
>>>> [2] - 
>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
>>>>  
>>>> <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56>
>>>> [3] - 
>>>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
>>>>  
>>>> <https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118>
>>>> [4] - 
>>>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#
>>>>  
>>>> <https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#>
>>>> 
>>> 
>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
> 

Reply via email to