My current prototype does require a small spark changeset, here is the diff. As I mentioned above I probably want to modify this not to read all footers from the driver if a summary file is totally absent, the remaining files could fall back to use the executor side pruning and uniform split planning.
--- .../sql/execution/DataSourceScanExec.scala | 56 ++++++++++++++++--- .../parquet/ParquetFileFormat.scala | 13 +++++ .../datasources/parquet/ParquetFilters.scala | 2 +- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 7dc9faeac7..9400dc976c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -17,10 +17,16 @@ package org.apache.spark.sql.execution +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} +import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter} +import org.apache.parquet.hadoop.{Footer, ParquetFileReader} +import org.apache.parquet.hadoop.metadata.ParquetMetadata +import org.apache.parquet.schema.MessageType import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession @@ -321,6 +327,7 @@ case class FileSourceScanExec( private lazy val inputRDD: RDD[InternalRow] = { // Update metrics for taking effect in both code generation node and normal node. updateDriverMetrics() + val hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options) val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -329,13 +336,13 @@ case class FileSourceScanExec( requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, - hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) + hadoopConf = hadoopConf) relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) case _ => - createNonBucketedReadRDD(readFile, selectedPartitions, relation) + createNonBucketedReadRDD(readFile, hadoopConf, selectedPartitions, relation) } } @@ -453,6 +460,7 @@ case class FileSourceScanExec( */ private def createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], + hadoopConf: Configuration, selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = @@ -466,17 +474,49 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + val footersMap: Map[Path, ParquetMetadata] = + if (fsRelation.fileFormat.isInstanceOf[ParquetSource] && pushedDownFilters.nonEmpty) { + // pushedDownFilters + val splitFiles: java.util.Collection[FileStatus] = selectedPartitions.flatMap { partition => + partition.files }.toSeq.asJavaCollection + val footers: java.util.List[Footer] = + ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( + hadoopConf, splitFiles, false) + footers.asScala.map(footer => footer.getFile -> footer.getParquetMetadata).toMap + } else { + Map() + } val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val blockLocations = getBlockLocations(file) if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { - (0L until file.getLen by maxSplitBytes).map { offset => - val remaining = file.getLen - offset - val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - val hosts = getBlockHosts(blockLocations, offset, size) - PartitionedFile( - partition.values, file.getPath.toUri.toString, offset, size, hosts) + if (fsRelation.fileFormat.isInstanceOf[ParquetSource] && + pushedDownFilters.nonEmpty) { + val footer = footersMap(file.getPath) + val fileSchema: MessageType = footer.getFileMetaData().getSchema(); + val filter = + ParquetSource.makeParquetFilters(fsRelation.sparkSession.sessionState.conf) + .createFilter(fileSchema, pushedDownFilters(1) /* TODO and these together */).get + + val blocks = RowGroupFilter.filterRowGroups( + FilterCompat.get(filter), footer.getBlocks(), fileSchema) + (blocks.asScala + .map(block => block.getColumns.asScala.head.getStartingPos)).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(blockLocations, offset, size) + PartitionedFile( + partition.values, file.getPath.toUri.toString, offset, size, hosts) + } + } else { + (0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(blockLocations, offset, size) + PartitionedFile( + partition.values, file.getPath.toUri.toString, offset, size, hosts) + } } } else { val hosts = getBlockHosts(blockLocations, 0, file.getLen) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 16cd570901..247a4b05f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -473,6 +473,19 @@ class ParquetFileFormat } object ParquetFileFormat extends Logging { + + def makeParquetFilters(sqlConf: SQLConf): ParquetFilters = { + val pushDownDate = sqlConf.parquetFilterPushDownDate + val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + val isCaseSensitive = sqlConf.caseSensitiveAnalysis + + new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, + pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + } + private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7e420d36f4..8a44331629 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters( +class ParquetFilters( pushDownDate: Boolean, pushDownTimestamp: Boolean, pushDownDecimal: Boolean, -- 2.23.0 On Wed, Sep 30, 2020, 5:29 PM Ryan Blue <rb...@netflix.com> wrote: > I agree that using column-level metrics is a great way to avoid doing > extra work. I didn't think that Parquet used column metrics from the > metadata file or directly from footers for job planning though? > > That's another thing that we've built into the newer formats. Iceberg will > prune unnecessary data files using lower/upper bounds, null counts, etc. > > On Wed, Sep 30, 2020 at 3:19 PM Jason Altekruse <altekruseja...@gmail.com> > wrote: > >> I'm not skipping row group metadata, I am trying to accomplish driver >> side pruning, I need to read row group information to get the column stats. >> We are managing the schema elsewhere. >> >> On engines replacing files, this is why I had proposed possibly adding >> length and/or last modified time to the summary files. >> >> I do understand that there are useful benefits to pushing as much as >> possible to the executors, and I agree with this change, but it has >> side-effects. Now for a table with thousands of files spark spins up >> thousands of tasks, if most of these can just be pruned out based on >> metadata, this can happen much faster with a consolidated list of all of >> the metadata rather than having all of the coordination overhead of the >> small tasks. >> >> I actually would like to have a design that would do the "fall-back" >> using the driver side pruning and uniform split planning for any footers >> missing from the summary file, but I thought that might add extra >> complexity to the discussion. >> >> Jason Altekruse >> >> >> On Wed, Sep 30, 2020 at 2:53 PM Ryan Blue <rb...@netflix.com> wrote: >> >>> I went back and looked at the code a bit. Looks like the deprecation >>> also had to do with MR job split planning. >>> >>> The main reason summary files were useful (I think) was that split >>> planning was done using the footers. This was extremely slow when reading >>> individual files, even when parallelizing on the driver. But, there were >>> correctness issues introduced by the metadata files because engines could >>> replace the data files easily because they used the same names, like >>> `part-r-00000`. To avoid the planning time, we switched to using FileSplit >>> based on uniform split sizes, just like other formats. That removed the >>> need for metadata files for split planning, and I think that's when we >>> deprecated them. >>> >>> When not reading the row group information from the metadata file, it is >>> assumed that all of the other metadata is the same >>> <https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L180-L184> >>> because the metadata is used for all of the requested files. This means >>> that for the use case of tracking file schemas, Parquet would do the wrong >>> thing. >>> >>> So the metadata files don't work quite right for table metadata and are >>> no longer used or recommended for planning splits. Seems reasonable to >>> deprecate them to me. >>> >>> For the table-level metadata, like schema, I would highly recommend >>> tracking that centrally in a table abstraction. If you're using standard >>> Parquet tables in Spark, there are severe limitations to what you can do >>> safely. What's worse, there is no validation for a path-based table that >>> you're not producing data with an incompatible schema or making an >>> incompatible change. So you can write two different types to the same >>> column name or break your table by renaming a column. This is one of the >>> reasons why we wanted to introduce better table metadata, so tables can >>> behave like real SQL tables. >>> >>> rb >>> >>> On Wed, Sep 30, 2020 at 9:38 AM Jason Altekruse < >>> altekruseja...@gmail.com> wrote: >>> >>>> Hey Ryan, >>>> >>>> Thanks for the response, I do not want to push the parquet community to >>>> keep around features that will cause users headaches, but I am still >>>> looking for the best solution to the problem I am facing. >>>> >>>> One thing I could use some clarity on, given what I have seen in >>>> various tools, I am actually not sure that there is a significant risk of >>>> wrong results with one possible small recommendation in relation to these >>>> files. >>>> >>>> I was not assuming that directories were immutable, and the method >>>> included in parquet-mr that I referenced, >>>> readAllFootersInParallelUsingSummaryFiles, also goes against the notion >>>> that this is a hard requirement. It specifically takes in a file listing, >>>> which needs to be provided by some external source, either the FS directly, >>>> or some transactional system like delta in my case and actually uses the >>>> metadata summary file just as a cache for footers, with explicit code to >>>> fall back to red any footers missing from the summary directly from the FS >>>> itself. >>>> >>>> It sounds like some projects in the past have used this file to avoid >>>> doing an FS listing, which I absolutely agree isn't safe to do and will >>>> cause problems when people copy in new files to a directory. Can we just >>>> document that this practice is bad? And possibly just deprecate any code >>>> that reads the summary file without this kind of fallback and an >>>> expectation that callers pass in a list of files they expect to get footers >>>> for? >>>> >>>> I also don't know if I have ever seen a project take advantage of the >>>> fact that you can technically directly append to a parquet file by reading >>>> in the previous footer, appending new row groups and writing out a whole >>>> new footer with the new metadata combined with the old, leaving dead bytes >>>> in the file where the old footer sat. I do remember discussing this >>>> possibility with Julien at some point, but I don't know if parquet-mr or >>>> any other projects actually have written code to do this. If this is done, >>>> it would provide another way for the summary file to become stale, and this >>>> would not be detectable with just knowing the filename, the summary would >>>> need to contain file length info. >>>> >>>> There is also the possibility that parquet files could be deleted and >>>> rewritten in the same filenames, but this isn't common in any hadoop/spark >>>> ecosystem projects I know of, they all generate unique filenames using >>>> application IDs or GUIDs. >>>> >>>> Jason Altekruse >>>> >>>> On Tue, Sep 29, 2020 at 8:26 PM Ryan Blue <rb...@netflix.com.invalid> >>>> wrote: >>>> >>>>> I don't remember deprecating it, but I've always recommended against >>>>> using >>>>> it because of the assumptions it requires. >>>>> >>>>> Those assumptions are routinely violated by processing engines and >>>>> users >>>>> that expect to be able to drop files into directories and see the >>>>> results >>>>> in their table. Since this was a feature without guard rails or >>>>> documentation to explain how to safely use it, I think it is a good >>>>> idea to >>>>> steer people away from it and deprecate it unless someone wants to >>>>> address >>>>> those concerns. Now, I think there are much better alternatives (thanks >>>>> Jacques!) so I probably wouldn't recommend spending time on bringing >>>>> this >>>>> up to date and making it marginally safer. >>>>> >>>>> On Tue, Sep 29, 2020 at 11:38 AM Julien Le Dem <julien.le...@gmail.com >>>>> > >>>>> wrote: >>>>> >>>>> > Hi Jason, >>>>> > Thank you for bringing this up. >>>>> > A correctness issue would only come up when more parquet files are >>>>> > added to the same folder or files are modified. Historically folders >>>>> have >>>>> > been considered immutables and the summary file reflects the >>>>> metadata for >>>>> > all the files in the folder. The summary file contains the names of >>>>> the >>>>> > files it is for, so extra files in the folder can also be detected >>>>> and >>>>> > dealt with at read time without correctness issues. >>>>> > Like you mentioned the read path allows for those files to not be >>>>> present. >>>>> > I think a better solution than deprecating would be to have a switch >>>>> > allowing turning off those summary files when one wants to be able >>>>> to not >>>>> > respect the immutable folder contact. >>>>> > Projects like Iceberg can elect to not produce them and allow >>>>> modifying >>>>> > and adding more files to the same folder without creating correctness >>>>> > problems. >>>>> > I would be in favor of removing those Deprecated annotations and >>>>> document >>>>> > the use of a switch to optionally not produce the summary files when >>>>> > electing to modify folders. >>>>> > I'm curious to hear from Ryan about this who did the change in the >>>>> first >>>>> > place. >>>>> > Best, >>>>> > Julien >>>>> > >>>>> > On Fri, Sep 25, 2020 at 3:06 PM Jason Altekruse < >>>>> altekruseja...@gmail.com> >>>>> > wrote: >>>>> > >>>>> >> Hy Jacques, >>>>> >> >>>>> >> It's good to hear from you, thanks for the pointer to Iceberg. I am >>>>> aware >>>>> >> of it as well as other similar projects, including Delta Lake, >>>>> which my >>>>> >> team is already using. Unfortunately even with Delta, there is only >>>>> a >>>>> >> placeholder in the project currently where they will be tracking >>>>> file >>>>> >> level >>>>> >> statistics at some point in the future, we are also evaluating the >>>>> >> possibility of implementing this in delta itself. While it and >>>>> Iceberg >>>>> >> aren't quite the same architecturally, I think there is enough >>>>> overlap >>>>> >> that >>>>> >> it might be a bit awkward to use the two in conjunction with one >>>>> another. >>>>> >> >>>>> >> From my testing so far, it appears that delta pretty easily can >>>>> operate >>>>> >> alongside these older metadata summary files without the two >>>>> fighting with >>>>> >> each other. Delta is responsible for maintaining a transactionally >>>>> >> consistent list of files, and this file can coexist in the >>>>> directory just >>>>> >> to allow efficient pruning on the driver side on a best effort >>>>> basis, as >>>>> >> it >>>>> >> can gracefully fall back to the FS if it is missing a newer file. >>>>> >> >>>>> >> We are somewhat nervous about depending on something that is marked >>>>> >> deprecated, but as it is so close to a "just works" state for our >>>>> needs, I >>>>> >> was hoping to confirm with the community if there were other risks >>>>> I was >>>>> >> missing. >>>>> >> >>>>> >> Jason Altekruse >>>>> >> >>>>> >> On Wed, Sep 23, 2020 at 6:29 PM Jacques Nadeau <jacq...@apache.org> >>>>> >> wrote: >>>>> >> >>>>> >> > Hey Jason, >>>>> >> > >>>>> >> > I'd suggest you look at Apache Iceberg. It is a much more mature >>>>> way of >>>>> >> > handling metadata efficiency issues and provides a substantial >>>>> superset >>>>> >> of >>>>> >> > functionality over the old metadata cache files. >>>>> >> > >>>>> >> > On Wed, Sep 23, 2020 at 4:16 PM Jason Altekruse < >>>>> >> altekruseja...@gmail.com> >>>>> >> > wrote: >>>>> >> > >>>>> >> > > Hello again, >>>>> >> > > >>>>> >> > > I took a look through the mail archives and found a little more >>>>> >> > information >>>>> >> > > in this and a few other threads. >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> > >>>>> >> >>>>> http://mail-archives.apache.org/mod_mbox//parquet-dev/201707.mbox/%3CCAO4re1k8-bZZZWBRuLCnm1V7AoJE1fdunSuBn%2BecRuFGPgcXnA%40mail.gmail.com%3E >>>>> >> > > >>>>> >> > > While I do understand the benefits for federating out the >>>>> reading of >>>>> >> > > footers for the sake of not worrying about synchronization >>>>> between the >>>>> >> > > cached metadata and any changes to the files on disk, it does >>>>> appear >>>>> >> > there >>>>> >> > > is still a use case that isn't solved well with this design, >>>>> needle >>>>> >> in a >>>>> >> > > haystack selective filter queries, where the data is sorted by >>>>> the >>>>> >> filter >>>>> >> > > column. For example in the tests I ran with queries against >>>>> lots of >>>>> >> > parquet >>>>> >> > > files where the vast majority are pruned by a bunch of small >>>>> tasks, it >>>>> >> > > takes 33 seconds vs just 1-2 seconds with driver side pruning >>>>> using >>>>> >> the >>>>> >> > > summary file (requires a small spark changet). >>>>> >> > > >>>>> >> > > In our use case we are never going to be replacing contents of >>>>> >> existing >>>>> >> > > parquet files (with a delete and rewrite on HDFS) or appending >>>>> new row >>>>> >> > > groups onto existing files. In that case I don't believe we >>>>> should >>>>> >> > > experience any correctness problems, but I wanted to confirm if >>>>> there >>>>> >> is >>>>> >> > > something I am missing. I am >>>>> >> > > using readAllFootersInParallelUsingSummaryFiles which does fall >>>>> back >>>>> >> to >>>>> >> > > read individual footers if they are missing from the summary >>>>> file. >>>>> >> > > >>>>> >> > > I am also curious if a solution to the correctness problems >>>>> could be >>>>> >> to >>>>> >> > > include a file length and/or last modified time into the >>>>> summary file, >>>>> >> > > which could confirm against FS metadata that the files on disk >>>>> are >>>>> >> still >>>>> >> > in >>>>> >> > > sync with the metadata summary relatively quickly. Would it be >>>>> >> possible >>>>> >> > to >>>>> >> > > consider avoiding this deprecation if I was to work on an >>>>> update to >>>>> >> > > implement this? >>>>> >> > > >>>>> >> > > - Jason Altekruse >>>>> >> > > >>>>> >> > > >>>>> >> > > On Tue, Sep 15, 2020 at 8:52 PM Jason Altekruse < >>>>> >> > altekruseja...@gmail.com> >>>>> >> > > wrote: >>>>> >> > > >>>>> >> > > > Hello all, >>>>> >> > > > >>>>> >> > > > I have been working on optimizing reads in spark to avoid >>>>> spinning >>>>> >> up >>>>> >> > > lots >>>>> >> > > > of short lived tasks that just perform row group pruning in >>>>> >> selective >>>>> >> > > > filter queries. >>>>> >> > > > >>>>> >> > > > My high level question is why metadata summary files were >>>>> marked >>>>> >> > > > deprecated in this Parquet changeset? There isn't much >>>>> explanation >>>>> >> > given >>>>> >> > > > or a description of what should be used instead. >>>>> >> > > > https://github.com/apache/parquet-mr/pull/429 >>>>> >> > > > >>>>> >> > > > There are other members of the broader parquet community that >>>>> are >>>>> >> also >>>>> >> > > > confused by this deprecation, see this discussion in an arrow >>>>> PR. >>>>> >> > > > https://github.com/apache/arrow/pull/4166 >>>>> >> > > > >>>>> >> > > > In the course of making my small prototype I got an extra >>>>> >> performance >>>>> >> > > > boost by making spark write out metadata summary files, >>>>> rather than >>>>> >> > > having >>>>> >> > > > to read all footers on the driver. This effect would be even >>>>> more >>>>> >> > > > pronounced on a completely remote storage system like S3. >>>>> Writing >>>>> >> these >>>>> >> > > > summary files was disabled by default in SPARK-15719, because >>>>> of the >>>>> >> > > > performance impact of appending a small number of new files >>>>> to an >>>>> >> > > existing >>>>> >> > > > dataset with many files. >>>>> >> > > > >>>>> >> > > > https://issues.apache.org/jira/browse/SPARK-15719 >>>>> >> > > > >>>>> >> > > > This spark JIRA does make decent points considering how spark >>>>> >> operates >>>>> >> > > > today, but I think that there is a performance optimization >>>>> >> opportunity >>>>> >> > > > that is missed because the row group pruning is deferred to a >>>>> bunch >>>>> >> of >>>>> >> > > > separate short lived tasks rather than done upfront, >>>>> currently spark >>>>> >> > only >>>>> >> > > > uses footers on the driver for schema merging. >>>>> >> > > > >>>>> >> > > > Thanks for the help! >>>>> >> > > > Jason Altekruse >>>>> >> > > > >>>>> >> > > >>>>> >> > >>>>> >> >>>>> > >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix >