Sounds like there are two problems. First, PARQUET-389 needs to be fixed. It looks like fixing it would have prevented the issues introduced in the work-around, so it is probably a good idea to fix the underlying problem as the way forward.
The second issue is being able to filter row groups, but skip the record-level filtering. This should be a really easy fix in the read path, so please open an issue for it. I think you should be able to get this into the 1.9.0 release. Also, this is somewhat related to the vectorized read API we're putting together a hackathon to tackle, so you may want to monitor that effort. rb On Thu, Jul 7, 2016 at 7:47 AM, Cheng Lian <[email protected]> wrote: > One of the commonly seen ETL use cases of Spark is inferring schema > automatically from JSON datasets and then convert them into Parquet. In > similar use cases, schema evolution support can be crucial. Reading from > Parquet files with different but compatible schemata is quite common. > Schema evolution combined with filter push-down can be a source of bugs. > PARQUET-389 is an example of this kind of bug. To workaround PARQUET-389, > we made some non-trivial changes in Spark (SPARK-11955), which further lead > to SPARK-16371. > > From the perspective of performance, I totally agree that row group level > filtering is valuable. I think the real problem here is that record-level > filtering is mandatory if the engine decides to use filter push-down. For > engines with vectorized Parquet reader, like Spark, Parquet built-in > record-level filtering is not performant enough. Actually, we observed that > disabling filter push-down may even result in better performance when the > data is not prepared for row group level filtering because the filter > predicates are evaluated at Spark side with the help of codegen. I think > one possible improvement we can do here is to make record-level filtering > optional. In this way, we may benefit from both Parquet built-in row group > level filtering and faster record-level filtering provided by upper level > engines. Of course, when record-level filtering is disabled, engines > themselves are responsible for doing the filtering. > > Cheng > > > On 7/7/16 2:43 AM, Ryan Blue wrote: > > Hi Reynold, > > Parquet uses the same predicates that are passed to the reader (via > withFilter [1]) for both record-level and row group filtering. We've found > that the main benefit is when they can be used to eliminate entire row > groups. > > What bugs have you found? I've not seen problems with the filtering done > by Parquet so I'm surprised that you guys have seen so many (presumably > that you've tracked to Parquet push-down?) that it doesn't seem worth it. > > Both record and row group filtering use the same predicates. Record > filtering evaluates a predicate using an assembled record, so it is > probably slower than filtering in Spark SQL. This is faster for engines > like Pig that don't have vectorized reads and would have additional calls > on top of the Parquet layer. Also, the 2.0 spec makes it possible to filter > individual data pages, but this hasn't been implemented. > > In contrast to record-level, row group filtering is *very* valuable when > data is correctly prepared. We have datasets where row group filtering gets > us a 20-100x speedup (measured in Pig, Presto, and Spark) because we only > need to read 1% of the data. This uses column-level stats from the footer > and dictionaries to eliminate row groups that can't satisfy the query > predicate. For example, for a column with min=5, max=26 and a predicate x < > 0, we know that there are no matching values. Similarly, we can look at a > dictionary and see all of the possible values and eliminate a row group if > none of them match the predicate. Row group filtering works best with the > data sorted within partitions by common query columns. > > rb > > [1]: > https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L190 > > On Wed, Jul 6, 2016 at 11:13 AM, Reynold Xin <[email protected]> wrote: > >> Among the people working on Spark there are a lot of confusions about what >> Parquet's filter pushdown actually accomplishes. Depending on who I talk >> to, I get "it filters rows one by one" or "it skips blocks via min/max >> value tracking". Can I get a more official response on this? >> >> The reason I'm asking is that we have seen so many bugs related to filter >> pushdown (either bugs in Parquet, or bugs in Spark's implementation of it) >> that we are considering just permanently disabling filter pushdown, if the >> performance gain is not enormous. >> >> Let me know. Thanks. >> > > > > -- > Ryan Blue > Software Engineer > Netflix > > > -- Ryan Blue Software Engineer Netflix
