One of the commonly seen ETL use cases of Spark is inferring schema automatically from JSON datasets and then convert them into Parquet. In similar use cases, schema evolution support can be crucial. Reading from Parquet files with different but compatible schemata is quite common. Schema evolution combined with filter push-down can be a source of bugs. PARQUET-389 is an example of this kind of bug. To workaround PARQUET-389, we made some non-trivial changes in Spark (SPARK-11955), which further lead to SPARK-16371.

From the perspective of performance, I totally agree that row group level filtering is valuable. I think the real problem here is that record-level filtering is mandatory if the engine decides to use filter push-down. For engines with vectorized Parquet reader, like Spark, Parquet built-in record-level filtering is not performant enough. Actually, we observed that disabling filter push-down may even result in better performance when the data is not prepared for row group level filtering because the filter predicates are evaluated at Spark side with the help of codegen. I think one possible improvement we can do here is to make record-level filtering optional. In this way, we may benefit from both Parquet built-in row group level filtering and faster record-level filtering provided by upper level engines. Of course, when record-level filtering is disabled, engines themselves are responsible for doing the filtering.

Cheng



On 7/7/16 2:43 AM, Ryan Blue wrote:
Hi Reynold,

Parquet uses the same predicates that are passed to the reader (via withFilter [1]) for both record-level and row group filtering. We've found that the main benefit is when they can be used to eliminate entire row groups.

What bugs have you found? I've not seen problems with the filtering done by Parquet so I'm surprised that you guys have seen so many (presumably that you've tracked to Parquet push-down?) that it doesn't seem worth it.

Both record and row group filtering use the same predicates. Record filtering evaluates a predicate using an assembled record, so it is probably slower than filtering in Spark SQL. This is faster for engines like Pig that don't have vectorized reads and would have additional calls on top of the Parquet layer. Also, the 2.0 spec makes it possible to filter individual data pages, but this hasn't been implemented.

In contrast to record-level, row group filtering is *very* valuable when data is correctly prepared. We have datasets where row group filtering gets us a 20-100x speedup (measured in Pig, Presto, and Spark) because we only need to read 1% of the data. This uses column-level stats from the footer and dictionaries to eliminate row groups that can't satisfy the query predicate. For example, for a column with min=5, max=26 and a predicate x < 0, we know that there are no matching values. Similarly, we can look at a dictionary and see all of the possible values and eliminate a row group if none of them match the predicate. Row group filtering works best with the data sorted within partitions by common query columns.

rb

[1]: https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190

On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <[email protected] <mailto:[email protected]>> wrote:

    Among the people working on Spark there are a lot of confusions
    about what
    Parquet's filter pushdown actually accomplishes. Depending on who
    I talk
    to, I get "it filters rows one by one" or "it skips blocks via min/max
    value tracking". Can I get a more official response on this?

    The reason I'm asking is that we have seen so many bugs related to
    filter
    pushdown (either bugs in Parquet, or bugs in Spark's
    implementation of it)
    that we are considering just permanently disabling filter
    pushdown, if the
    performance gain is not enormous.

    Let me know. Thanks.




--
Ryan Blue
Software Engineer
Netflix

Reply via email to