Filed PARQUET-654 for making record-level filtering optional.
Cheng
On 7/7/16 11:57 PM, Ryan Blue wrote:
Sounds like there are two problems. First, PARQUET-389 needs to be
fixed. It looks like fixing it would have prevented the issues
introduced in the work-around, so it is probably a good idea to fix
the underlying problem as the way forward.
The second issue is being able to filter row groups, but skip the
record-level filtering. This should be a really easy fix in the read
path, so please open an issue for it. I think you should be able to
get this into the 1.9.0 release. Also, this is somewhat related to the
vectorized read API we're putting together a hackathon to tackle, so
you may want to monitor that effort.
rb
On Thu, Jul 7, 2016 at 7:47 AM, Cheng Lian <[email protected]
<mailto:[email protected]>> wrote:
One of the commonly seen ETL use cases of Spark is inferring
schema automatically from JSON datasets and then convert them into
Parquet. In similar use cases, schema evolution support can be
crucial. Reading from Parquet files with different but compatible
schemata is quite common. Schema evolution combined with filter
push-down can be a source of bugs. PARQUET-389 is an example of
this kind of bug. To workaround PARQUET-389, we made some
non-trivial changes in Spark (SPARK-11955), which further lead to
SPARK-16371.
From the perspective of performance, I totally agree that row
group level filtering is valuable. I think the real problem here
is that record-level filtering is mandatory if the engine decides
to use filter push-down. For engines with vectorized Parquet
reader, like Spark, Parquet built-in record-level filtering is not
performant enough. Actually, we observed that disabling filter
push-down may even result in better performance when the data is
not prepared for row group level filtering because the filter
predicates are evaluated at Spark side with the help of codegen. I
think one possible improvement we can do here is to make
record-level filtering optional. In this way, we may benefit from
both Parquet built-in row group level filtering and faster
record-level filtering provided by upper level engines. Of course,
when record-level filtering is disabled, engines themselves are
responsible for doing the filtering.
Cheng
On 7/7/16 2:43 AM, Ryan Blue wrote:
Hi Reynold,
Parquet uses the same predicates that are passed to the reader
(via withFilter [1]) for both record-level and row group
filtering. We've found that the main benefit is when they can be
used to eliminate entire row groups.
What bugs have you found? I've not seen problems with the
filtering done by Parquet so I'm surprised that you guys have
seen so many (presumably that you've tracked to Parquet
push-down?) that it doesn't seem worth it.
Both record and row group filtering use the same predicates.
Record filtering evaluates a predicate using an assembled record,
so it is probably slower than filtering in Spark SQL. This is
faster for engines like Pig that don't have vectorized reads and
would have additional calls on top of the Parquet layer. Also,
the 2.0 spec makes it possible to filter individual data pages,
but this hasn't been implemented.
In contrast to record-level, row group filtering is *very*
valuable when data is correctly prepared. We have datasets where
row group filtering gets us a 20-100x speedup (measured in Pig,
Presto, and Spark) because we only need to read 1% of the data.
This uses column-level stats from the footer and dictionaries to
eliminate row groups that can't satisfy the query predicate. For
example, for a column with min=5, max=26 and a predicate x < 0,
we know that there are no matching values. Similarly, we can look
at a dictionary and see all of the possible values and eliminate
a row group if none of them match the predicate. Row group
filtering works best with the data sorted within partitions by
common query columns.
rb
[1]:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190
On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <[email protected]
<mailto:[email protected]>> wrote:
Among the people working on Spark there are a lot of
confusions about what
Parquet's filter pushdown actually accomplishes. Depending on
who I talk
to, I get "it filters rows one by one" or "it skips blocks
via min/max
value tracking". Can I get a more official response on this?
The reason I'm asking is that we have seen so many bugs
related to filter
pushdown (either bugs in Parquet, or bugs in Spark's
implementation of it)
that we are considering just permanently disabling filter
pushdown, if the
performance gain is not enormous.
Let me know. Thanks.
--
Ryan Blue
Software Engineer
Netflix
--
Ryan Blue
Software Engineer
Netflix