Hey Anton,
      Wanted to circle back on the Spark PR [1] to add support for nested
fields .. I tried applying it, tested it. With this change Spark pushes
filters on structs down to Iceberg, but Iceberg expression handling seems
to fail in validation ..


Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot find
field 'location.lat' in struct: struct<1: age: optional int, 2: name:
optional string, 3: friends: optional map<string, int>, 4: location:
optional struct<7: lat: optional double, 8: lon: optional double>>
  at
com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
  at
com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
  at
com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
  at
com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
  at
com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
  at
com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
  at
com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
  at
com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57)
  at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
  at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
  at
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)


I think this should be handled in Iceberg as  struct filters like a.b.c =
"blah" is a legit way to query in SQL. If you feel this is a valid
assumption I can work on a fix.   Thoughts?


*Test Table Schema:*
scala> iceDf.printSchema
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- friends: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- location: struct (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)


*Gist to recreate issue:*
https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac


Cheers,
-Gautam.



[1] -  https://github.com/apache/spark/pull/22573

On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi <aokolnyc...@apple.com>
wrote:

> Unfortunately, Spark doesn’t push down filters for nested columns. I
> remember an effort to implement it [1]. However, it is not merged.
> So, even if we have proper statistics in Iceberg, we cannot leverage it
> from Spark.
>
> [1] - https://github.com/apache/spark/pull/22573
>
>
> On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com> wrote:
>
> Thanks Anton, this is very helpful!  I will apply the patch from pull#63
> and give it a shot.
>
> Re: Collecting min/max stas on nested structures ( 
> *https://github.com/apache/incubator-iceberg/issues/78
> <https://github.com/apache/incubator-iceberg/issues/78>* ) ...
>
> We have the exact same use case for skipping files on nested field
> filters. I was intrigued by your comment on enabling stats on nested
> structures by replacing `fileSchema.asStruct().field(fieldId)` with `
> fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have
> you had success with this? If so, I can try it out on our data as well.
>
>
>
>
>
> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <aokolnyc...@apple.com>
> wrote:
>
>> Hi Gautam,
>>
>> I believe you see this behaviour because SparkAppenderFactory is
>> configured to use ParquetWriteAdapter. It only tracks the number of records
>> and uses ParquetWriteSupport from Spark. This means that the statistics is
>> not collected on writes and cannot be used on reads.
>>
>> Once [1] is merged, proper statistics will be fetched from the footer and
>> persisted in the manifests. The statistics is collected when writing data
>> files not manifests. See [2] for more info. Also, [3] contains an example
>> that filters out files (it requires [1] to be cherry-picked locally).
>>
>> Hope that helps,
>> Anton
>>
>> [1] - https://github.com/apache/incubator-iceberg/pull/63
>> [2] -
>> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
>> [3] - https://github.com/apache/incubator-iceberg/pull/105
>>
>>
>> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com> wrote:
>>
>> .. Just to be clear my concern is around Iceberg not skipping files.
>> Iceberg does skip rowGroups when scanning files as
>> *iceberg.parquet.ParquetReader* uses the parquet stats under it while
>> skipping, albeit none of these stats come from the manifests.
>>
>> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com> wrote:
>>
>>> Hello Devs,
>>>                    I am looking into leveraging Iceberg to speed up
>>> split generation and to minimize file scans. My understanding was that
>>> Iceberg keeps key statistics as listed under Metrics.java [1] viz. column
>>> lower/upper bounds, nullValues, distinct value counts, etc. and that table
>>> scanning leverages these to skip partitions, files & row-groups (in the
>>> Parquet context).
>>>
>>> What I found is files aren't skipped when a predicate applies only to a
>>> subset of the table's files. Within a partition it will scan all files as
>>> manifests only keep record counts but the rest of the metrics (lower,
>>> upper, distinct value counts, null values) are null / empty. This is coz
>>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently
>>> that is the only appender supported for writing manifest files.
>>>
>>>
>>> *Example :*
>>>
>>> In following example iceTable was generated by iteratively adding two
>>> files so it has two separate parquet files under it ..
>>>
>>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))
>>>
>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>> partition_data=PartitionData{}, residual=true}
>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>> partition_data=PartitionData{}, residual=true}
>>>
>>>
>>> *Only one file contains row with age = null .. *
>>>
>>> scala> iceDf.show()
>>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task
>>> of very large size (113 KB). The maximum recommended task size is 100 KB.
>>> +----+-------+--------------------+
>>> | age|   name|             friends|
>>> +----+-------+--------------------+
>>> |  60| Kannan|      [Justin -> 19]|
>>> |  75| Sharon|[Michael -> 30, J...|
>>> |null|Michael|                null|
>>> |  30|   Andy|[Josh -> 10, Bisw...|
>>> |  19| Justin|[Kannan -> 75, Sa...|
>>> +----+-------+--------------------+
>>>
>>>
>>>
>>> *Running filter on isNull(age) scans both files .. *
>>>
>>> val isNullExp = Expressions.isNull("age")
>>> val isNullScan = iceTable.newScan().filter(isNullExp)
>>>
>>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>>>
>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>
>>>
>>>
>>> I would expect only one file to be scanned as Iceberg should track
>>> nullValueCounts as per Metrics.java [1] .. The same issue holds for integer
>>> comparison filters scanning too many files.
>>>
>>> When I looked through the code, there is provision for using Parquet
>>> file footer stats to populate Manifest Metrics [3] but this is never used
>>> as Iceberg currently only allows AvroFileAppender for creating manifest
>>> files.
>>>
>>> What's the plan around using Parquet footer stats in manifests which can
>>> be very useful during split generation? I saw some discussions around this
>>> in the Iceberg Spec document [4] but couldn't glean if any of those are
>>> actually implemented yet.
>>>
>>> I can work on a proposal PR for adding these in but wanted to  know the
>>> current thoughts around this.
>>>
>>>
>>> *Gist for above example *:
>>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7
>>>
>>>
>>> Looking forward to your feedback,
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
>>> [2] -
>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
>>> [3] -
>>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
>>> [4] -
>>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#
>>>
>>>
>>
>

Reply via email to