Hello Devs,
                   I am looking into leveraging Iceberg to speed up split
generation and to minimize file scans. My understanding was that Iceberg
keeps key statistics as listed under Metrics.java [1] viz. column
lower/upper bounds, nullValues, distinct value counts, etc. and that table
scanning leverages these to skip partitions, files & row-groups (in the
Parquet context).

What I found is files aren't skipped when a predicate applies only to a
subset of the table's files. Within a partition it will scan all files as
manifests only keep record counts but the rest of the metrics (lower,
upper, distinct value counts, null values) are null / empty. This is coz
AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently
that is the only appender supported for writing manifest files.


*Example :*

In following example iceTable was generated by iteratively adding two files
so it has two separate parquet files under it ..

scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))

BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
partition_data=PartitionData{}, residual=true}
BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
partition_data=PartitionData{}, residual=true}


*Only one file contains row with age = null .. *

scala> iceDf.show()
19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task of
very large size (113 KB). The maximum recommended task size is 100 KB.
+----+-------+--------------------+
| age|   name|             friends|
+----+-------+--------------------+
|  60| Kannan|      [Justin -> 19]|
|  75| Sharon|[Michael -> 30, J...|
|null|Michael|                null|
|  30|   Andy|[Josh -> 10, Bisw...|
|  19| Justin|[Kannan -> 75, Sa...|
+----+-------+--------------------+



*Running filter on isNull(age) scans both files .. *

val isNullExp = Expressions.isNull("age")
val isNullScan = iceTable.newScan().filter(isNullExp)

scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))

BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
partition_data=PartitionData{}, residual=is_null(ref(name="age"))}



I would expect only one file to be scanned as Iceberg should track
nullValueCounts as per Metrics.java [1] .. The same issue holds for integer
comparison filters scanning too many files.

When I looked through the code, there is provision for using Parquet file
footer stats to populate Manifest Metrics [3] but this is never used as
Iceberg currently only allows AvroFileAppender for creating manifest files.

What's the plan around using Parquet footer stats in manifests which can be
very useful during split generation? I saw some discussions around this in
the Iceberg Spec document [4] but couldn't glean if any of those are
actually implemented yet.

I can work on a proposal PR for adding these in but wanted to  know the
current thoughts around this.


*Gist for above example *:
https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7


Looking forward to your feedback,

Cheers,
-Gautam.





[1] -
https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
[2] -
https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
[3] -
https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
[4] -
https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#

Reply via email to