Hey Anton, Wanted to circle back on the Spark PR [1] to add support for nested fields .. I tried applying it, tested it. With this change Spark pushes filters on structs down to Iceberg, but Iceberg expression handling seems to fail in validation ..
Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot find field 'location.lat' in struct: struct<1: age: optional int, 2: name: optional string, 3: friends: optional map<string, int>, 4: location: optional struct<7: lat: optional double, 8: lon: optional double>> at com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42) at com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76) at com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138) at com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94) at com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147) at com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160) at com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108) at com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57) at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153) at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) I think this should be handled in Iceberg as struct filters like a.b.c = "blah" is a legit way to query in SQL. If you feel this is a valid assumption I can work on a fix. Thoughts? *Test Table Schema:* scala> iceDf.printSchema root |-- age: integer (nullable = true) |-- name: string (nullable = true) |-- friends: map (nullable = true) | |-- key: string | |-- value: integer (valueContainsNull = true) |-- location: struct (nullable = true) | |-- lat: double (nullable = true) | |-- lon: double (nullable = true) *Gist to recreate issue:* https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac Cheers, -Gautam. [1] - https://github.com/apache/spark/pull/22573 On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi <aokolnyc...@apple.com> wrote: > Unfortunately, Spark doesn’t push down filters for nested columns. I > remember an effort to implement it [1]. However, it is not merged. > So, even if we have proper statistics in Iceberg, we cannot leverage it > from Spark. > > [1] - https://github.com/apache/spark/pull/22573 > > > On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com> wrote: > > Thanks Anton, this is very helpful! I will apply the patch from pull#63 > and give it a shot. > > Re: Collecting min/max stas on nested structures ( > *https://github.com/apache/incubator-iceberg/issues/78 > <https://github.com/apache/incubator-iceberg/issues/78>* ) ... > > We have the exact same use case for skipping files on nested field > filters. I was intrigued by your comment on enabling stats on nested > structures by replacing `fileSchema.asStruct().field(fieldId)` with ` > fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have > you had success with this? If so, I can try it out on our data as well. > > > > > > On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <aokolnyc...@apple.com> > wrote: > >> Hi Gautam, >> >> I believe you see this behaviour because SparkAppenderFactory is >> configured to use ParquetWriteAdapter. It only tracks the number of records >> and uses ParquetWriteSupport from Spark. This means that the statistics is >> not collected on writes and cannot be used on reads. >> >> Once [1] is merged, proper statistics will be fetched from the footer and >> persisted in the manifests. The statistics is collected when writing data >> files not manifests. See [2] for more info. Also, [3] contains an example >> that filters out files (it requires [1] to be cherry-picked locally). >> >> Hope that helps, >> Anton >> >> [1] - https://github.com/apache/incubator-iceberg/pull/63 >> [2] - >> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java >> [3] - https://github.com/apache/incubator-iceberg/pull/105 >> >> >> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com> wrote: >> >> .. Just to be clear my concern is around Iceberg not skipping files. >> Iceberg does skip rowGroups when scanning files as >> *iceberg.parquet.ParquetReader* uses the parquet stats under it while >> skipping, albeit none of these stats come from the manifests. >> >> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com> wrote: >> >>> Hello Devs, >>> I am looking into leveraging Iceberg to speed up >>> split generation and to minimize file scans. My understanding was that >>> Iceberg keeps key statistics as listed under Metrics.java [1] viz. column >>> lower/upper bounds, nullValues, distinct value counts, etc. and that table >>> scanning leverages these to skip partitions, files & row-groups (in the >>> Parquet context). >>> >>> What I found is files aren't skipped when a predicate applies only to a >>> subset of the table's files. Within a partition it will scan all files as >>> manifests only keep record counts but the rest of the metrics (lower, >>> upper, distinct value counts, null values) are null / empty. This is coz >>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently >>> that is the only appender supported for writing manifest files. >>> >>> >>> *Example :* >>> >>> In following example iceTable was generated by iteratively adding two >>> files so it has two separate parquet files under it .. >>> >>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl)) >>> >>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet, >>> partition_data=PartitionData{}, residual=true} >>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet, >>> partition_data=PartitionData{}, residual=true} >>> >>> >>> *Only one file contains row with age = null .. * >>> >>> scala> iceDf.show() >>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task >>> of very large size (113 KB). The maximum recommended task size is 100 KB. >>> +----+-------+--------------------+ >>> | age| name| friends| >>> +----+-------+--------------------+ >>> | 60| Kannan| [Justin -> 19]| >>> | 75| Sharon|[Michael -> 30, J...| >>> |null|Michael| null| >>> | 30| Andy|[Josh -> 10, Bisw...| >>> | 19| Justin|[Kannan -> 75, Sa...| >>> +----+-------+--------------------+ >>> >>> >>> >>> *Running filter on isNull(age) scans both files .. * >>> >>> val isNullExp = Expressions.isNull("age") >>> val isNullScan = iceTable.newScan().filter(isNullExp) >>> >>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl)) >>> >>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet, >>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))} >>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet, >>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))} >>> >>> >>> >>> I would expect only one file to be scanned as Iceberg should track >>> nullValueCounts as per Metrics.java [1] .. The same issue holds for integer >>> comparison filters scanning too many files. >>> >>> When I looked through the code, there is provision for using Parquet >>> file footer stats to populate Manifest Metrics [3] but this is never used >>> as Iceberg currently only allows AvroFileAppender for creating manifest >>> files. >>> >>> What's the plan around using Parquet footer stats in manifests which can >>> be very useful during split generation? I saw some discussions around this >>> in the Iceberg Spec document [4] but couldn't glean if any of those are >>> actually implemented yet. >>> >>> I can work on a proposal PR for adding these in but wanted to know the >>> current thoughts around this. >>> >>> >>> *Gist for above example *: >>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7 >>> >>> >>> Looking forward to your feedback, >>> >>> Cheers, >>> -Gautam. >>> >>> >>> >>> >>> >>> [1] - >>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java >>> [2] - >>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56 >>> [3] - >>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118 >>> [4] - >>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit# >>> >>> >> >