Filed PARQUET-654 for making record-level filtering optional.

Cheng

On 7/7/16 11:57 PM, Ryan Blue wrote:
Sounds like there are two problems. First, PARQUET-389 needs to be fixed. It looks like fixing it would have prevented the issues introduced in the work-around, so it is probably a good idea to fix the underlying problem as the way forward.

The second issue is being able to filter row groups, but skip the record-level filtering. This should be a really easy fix in the read path, so please open an issue for it. I think you should be able to get this into the 1.9.0 release. Also, this is somewhat related to the vectorized read API we're putting together a hackathon to tackle, so you may want to monitor that effort.

rb

On Thu, Jul 7, 2016 at 7:47 AM, Cheng Lian <[email protected] <mailto:[email protected]>> wrote:

    One of the commonly seen ETL use cases of Spark is inferring
    schema automatically from JSON datasets and then convert them into
    Parquet. In similar use cases, schema evolution support can be
    crucial. Reading from Parquet files with different but compatible
    schemata is quite common. Schema evolution combined with filter
    push-down can be a source of bugs. PARQUET-389 is an example of
    this kind of bug. To workaround PARQUET-389, we made some
    non-trivial changes in Spark (SPARK-11955), which further lead to
    SPARK-16371.

    From the perspective of performance, I totally agree that row
    group level filtering is valuable. I think the real problem here
    is that record-level filtering is mandatory if the engine decides
    to use filter push-down. For engines with vectorized Parquet
    reader, like Spark, Parquet built-in record-level filtering is not
    performant enough. Actually, we observed that disabling filter
    push-down may even result in better performance when the data is
    not prepared for row group level filtering because the filter
    predicates are evaluated at Spark side with the help of codegen. I
    think one possible improvement we can do here is to make
    record-level filtering optional. In this way, we may benefit from
    both Parquet built-in row group level filtering and faster
    record-level filtering provided by upper level engines. Of course,
    when record-level filtering is disabled, engines themselves are
    responsible for doing the filtering.

    Cheng



    On 7/7/16 2:43 AM, Ryan Blue wrote:
    Hi Reynold,

    Parquet uses the same predicates that are passed to the reader
    (via withFilter [1]) for both record-level and row group
    filtering. We've found that the main benefit is when they can be
    used to eliminate entire row groups.

    What bugs have you found? I've not seen problems with the
    filtering done by Parquet so I'm surprised that you guys have
    seen so many (presumably that you've tracked to Parquet
    push-down?) that it doesn't seem worth it.

    Both record and row group filtering use the same predicates.
    Record filtering evaluates a predicate using an assembled record,
    so it is probably slower than filtering in Spark SQL. This is
    faster for engines like Pig that don't have vectorized reads and
    would have additional calls on top of the Parquet layer. Also,
    the 2.0 spec makes it possible to filter individual data pages,
    but this hasn't been implemented.

    In contrast to record-level, row group filtering is *very*
    valuable when data is correctly prepared. We have datasets where
    row group filtering gets us a 20-100x speedup (measured in Pig,
    Presto, and Spark) because we only need to read 1% of the data.
    This uses column-level stats from the footer and dictionaries to
    eliminate row groups that can't satisfy the query predicate. For
    example, for a column with min=5, max=26 and a predicate x < 0,
    we know that there are no matching values. Similarly, we can look
    at a dictionary and see all of the possible values and eliminate
    a row group if none of them match the predicate. Row group
    filtering works best with the data sorted within partitions by
    common query columns.

    rb

    [1]:
    
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190

    On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <[email protected]
    <mailto:[email protected]>> wrote:

        Among the people working on Spark there are a lot of
        confusions about what
        Parquet's filter pushdown actually accomplishes. Depending on
        who I talk
        to, I get "it filters rows one by one" or "it skips blocks
        via min/max
        value tracking". Can I get a more official response on this?

        The reason I'm asking is that we have seen so many bugs
        related to filter
        pushdown (either bugs in Parquet, or bugs in Spark's
        implementation of it)
        that we are considering just permanently disabling filter
        pushdown, if the
        performance gain is not enormous.

        Let me know. Thanks.




-- Ryan Blue
    Software Engineer
    Netflix




--
Ryan Blue
Software Engineer
Netflix

Reply via email to