One of the commonly seen ETL use cases of Spark is inferring schema
automatically from JSON datasets and then convert them into Parquet. In
similar use cases, schema evolution support can be crucial. Reading from
Parquet files with different but compatible schemata is quite common.
Schema evolution combined with filter push-down can be a source of bugs.
PARQUET-389 is an example of this kind of bug. To workaround
PARQUET-389, we made some non-trivial changes in Spark (SPARK-11955),
which further lead to SPARK-16371.
From the perspective of performance, I totally agree that row group
level filtering is valuable. I think the real problem here is that
record-level filtering is mandatory if the engine decides to use filter
push-down. For engines with vectorized Parquet reader, like Spark,
Parquet built-in record-level filtering is not performant enough.
Actually, we observed that disabling filter push-down may even result in
better performance when the data is not prepared for row group level
filtering because the filter predicates are evaluated at Spark side with
the help of codegen. I think one possible improvement we can do here is
to make record-level filtering optional. In this way, we may benefit
from both Parquet built-in row group level filtering and faster
record-level filtering provided by upper level engines. Of course, when
record-level filtering is disabled, engines themselves are responsible
for doing the filtering.
Cheng
On 7/7/16 2:43 AM, Ryan Blue wrote:
Hi Reynold,
Parquet uses the same predicates that are passed to the reader (via
withFilter [1]) for both record-level and row group filtering. We've
found that the main benefit is when they can be used to eliminate
entire row groups.
What bugs have you found? I've not seen problems with the filtering
done by Parquet so I'm surprised that you guys have seen so many
(presumably that you've tracked to Parquet push-down?) that it doesn't
seem worth it.
Both record and row group filtering use the same predicates. Record
filtering evaluates a predicate using an assembled record, so it is
probably slower than filtering in Spark SQL. This is faster for
engines like Pig that don't have vectorized reads and would have
additional calls on top of the Parquet layer. Also, the 2.0 spec makes
it possible to filter individual data pages, but this hasn't been
implemented.
In contrast to record-level, row group filtering is *very* valuable
when data is correctly prepared. We have datasets where row group
filtering gets us a 20-100x speedup (measured in Pig, Presto, and
Spark) because we only need to read 1% of the data. This uses
column-level stats from the footer and dictionaries to eliminate row
groups that can't satisfy the query predicate. For example, for a
column with min=5, max=26 and a predicate x < 0, we know that there
are no matching values. Similarly, we can look at a dictionary and see
all of the possible values and eliminate a row group if none of them
match the predicate. Row group filtering works best with the data
sorted within partitions by common query columns.
rb
[1]:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190
On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <[email protected]
<mailto:[email protected]>> wrote:
Among the people working on Spark there are a lot of confusions
about what
Parquet's filter pushdown actually accomplishes. Depending on who
I talk
to, I get "it filters rows one by one" or "it skips blocks via min/max
value tracking". Can I get a more official response on this?
The reason I'm asking is that we have seen so many bugs related to
filter
pushdown (either bugs in Parquet, or bugs in Spark's
implementation of it)
that we are considering just permanently disabling filter
pushdown, if the
performance gain is not enormous.
Let me know. Thanks.
--
Ryan Blue
Software Engineer
Netflix