My current prototype does require a small spark changeset, here is the
diff. As I mentioned above I probably want to modify this not to read all
footers from the driver if a summary file is totally absent, the remaining
files could fall back to use the executor side pruning and uniform split
planning.
---
.../sql/execution/DataSourceScanExec.scala | 56 ++++++++++++++++---
.../parquet/ParquetFileFormat.scala | 13 +++++
.../datasources/parquet/ParquetFilters.scala | 2 +-
3 files changed, 62 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 7dc9faeac7..9400dc976c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -17,10 +17,16 @@
package org.apache.spark.sql.execution
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path}
+import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter}
+import org.apache.parquet.hadoop.{Footer, ParquetFileReader}
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+import org.apache.parquet.schema.MessageType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
@@ -321,6 +327,7 @@ case class FileSourceScanExec(
private lazy val inputRDD: RDD[InternalRow] = {
// Update metrics for taking effect in both code generation node and
normal node.
updateDriverMetrics()
+ val hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -329,13 +336,13 @@ case class FileSourceScanExec(
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
- hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+ hadoopConf = hadoopConf)
relation.bucketSpec match {
case Some(bucketing) if
relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions,
relation)
case _ =>
- createNonBucketedReadRDD(readFile, selectedPartitions, relation)
+ createNonBucketedReadRDD(readFile, hadoopConf, selectedPartitions,
relation)
}
}
@@ -453,6 +460,7 @@ case class FileSourceScanExec(
*/
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
+ hadoopConf: Configuration,
selectedPartitions: Array[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
@@ -466,17 +474,49 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes
bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ val footersMap: Map[Path, ParquetMetadata] =
+ if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
pushedDownFilters.nonEmpty) {
+ // pushedDownFilters
+ val splitFiles: java.util.Collection[FileStatus] =
selectedPartitions.flatMap { partition =>
+ partition.files }.toSeq.asJavaCollection
+ val footers: java.util.List[Footer] =
+ ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
+ hadoopConf, splitFiles, false)
+ footers.asScala.map(footer => footer.getFile ->
footer.getParquetMetadata).toMap
+ } else {
+ Map()
+ }
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
- (0L until file.getLen by maxSplitBytes).map { offset =>
- val remaining = file.getLen - offset
- val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
- val hosts = getBlockHosts(blockLocations, offset, size)
- PartitionedFile(
- partition.values, file.getPath.toUri.toString, offset, size,
hosts)
+ if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+ pushedDownFilters.nonEmpty) {
+ val footer = footersMap(file.getPath)
+ val fileSchema: MessageType =
footer.getFileMetaData().getSchema();
+ val filter =
+
ParquetSource.makeParquetFilters(fsRelation.sparkSession.sessionState.conf)
+ .createFilter(fileSchema, pushedDownFilters(1) /* TODO and
these together */).get
+
+ val blocks = RowGroupFilter.filterRowGroups(
+ FilterCompat.get(filter), footer.getBlocks(), fileSchema)
+ (blocks.asScala
+ .map(block =>
block.getColumns.asScala.head.getStartingPos)).map { offset =>
+ val remaining = file.getLen - offset
+ val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
+ val hosts = getBlockHosts(blockLocations, offset, size)
+ PartitionedFile(
+ partition.values, file.getPath.toUri.toString, offset,
size, hosts)
+ }
+ } else {
+ (0L until file.getLen by maxSplitBytes).map { offset =>
+ val remaining = file.getLen - offset
+ val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
+ val hosts = getBlockHosts(blockLocations, offset, size)
+ PartitionedFile(
+ partition.values, file.getPath.toUri.toString, offset,
size, hosts)
+ }
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 16cd570901..247a4b05f4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -473,6 +473,19 @@ class ParquetFileFormat
}
object ParquetFileFormat extends Logging {
+
+ def makeParquetFilters(sqlConf: SQLConf): ParquetFilters = {
+ val pushDownDate = sqlConf.parquetFilterPushDownDate
+ val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+ val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+ val pushDownStringStartWith =
sqlConf.parquetFilterPushDownStringStartWith
+ val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
+ val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+ new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
+ pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+ }
+
private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession):
Option[StructType] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7e420d36f4..8a44331629 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* Some utility function to convert Spark data source filters to Parquet
filters.
*/
-private[parquet] class ParquetFilters(
+class ParquetFilters(
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownDecimal: Boolean,
--
2.23.0
On Wed, Sep 30, 2020, 5:29 PM Ryan Blue <[email protected]> wrote:
> I agree that using column-level metrics is a great way to avoid doing
> extra work. I didn't think that Parquet used column metrics from the
> metadata file or directly from footers for job planning though?
>
> That's another thing that we've built into the newer formats. Iceberg will
> prune unnecessary data files using lower/upper bounds, null counts, etc.
>
> On Wed, Sep 30, 2020 at 3:19 PM Jason Altekruse <[email protected]>
> wrote:
>
>> I'm not skipping row group metadata, I am trying to accomplish driver
>> side pruning, I need to read row group information to get the column stats.
>> We are managing the schema elsewhere.
>>
>> On engines replacing files, this is why I had proposed possibly adding
>> length and/or last modified time to the summary files.
>>
>> I do understand that there are useful benefits to pushing as much as
>> possible to the executors, and I agree with this change, but it has
>> side-effects. Now for a table with thousands of files spark spins up
>> thousands of tasks, if most of these can just be pruned out based on
>> metadata, this can happen much faster with a consolidated list of all of
>> the metadata rather than having all of the coordination overhead of the
>> small tasks.
>>
>> I actually would like to have a design that would do the "fall-back"
>> using the driver side pruning and uniform split planning for any footers
>> missing from the summary file, but I thought that might add extra
>> complexity to the discussion.
>>
>> Jason Altekruse
>>
>>
>> On Wed, Sep 30, 2020 at 2:53 PM Ryan Blue <[email protected]> wrote:
>>
>>> I went back and looked at the code a bit. Looks like the deprecation
>>> also had to do with MR job split planning.
>>>
>>> The main reason summary files were useful (I think) was that split
>>> planning was done using the footers. This was extremely slow when reading
>>> individual files, even when parallelizing on the driver. But, there were
>>> correctness issues introduced by the metadata files because engines could
>>> replace the data files easily because they used the same names, like
>>> `part-r-00000`. To avoid the planning time, we switched to using FileSplit
>>> based on uniform split sizes, just like other formats. That removed the
>>> need for metadata files for split planning, and I think that's when we
>>> deprecated them.
>>>
>>> When not reading the row group information from the metadata file, it is
>>> assumed that all of the other metadata is the same
>>> <https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L180-L184>
>>> because the metadata is used for all of the requested files. This means
>>> that for the use case of tracking file schemas, Parquet would do the wrong
>>> thing.
>>>
>>> So the metadata files don't work quite right for table metadata and are
>>> no longer used or recommended for planning splits. Seems reasonable to
>>> deprecate them to me.
>>>
>>> For the table-level metadata, like schema, I would highly recommend
>>> tracking that centrally in a table abstraction. If you're using standard
>>> Parquet tables in Spark, there are severe limitations to what you can do
>>> safely. What's worse, there is no validation for a path-based table that
>>> you're not producing data with an incompatible schema or making an
>>> incompatible change. So you can write two different types to the same
>>> column name or break your table by renaming a column. This is one of the
>>> reasons why we wanted to introduce better table metadata, so tables can
>>> behave like real SQL tables.
>>>
>>> rb
>>>
>>> On Wed, Sep 30, 2020 at 9:38 AM Jason Altekruse <
>>> [email protected]> wrote:
>>>
>>>> Hey Ryan,
>>>>
>>>> Thanks for the response, I do not want to push the parquet community to
>>>> keep around features that will cause users headaches, but I am still
>>>> looking for the best solution to the problem I am facing.
>>>>
>>>> One thing I could use some clarity on, given what I have seen in
>>>> various tools, I am actually not sure that there is a significant risk of
>>>> wrong results with one possible small recommendation in relation to these
>>>> files.
>>>>
>>>> I was not assuming that directories were immutable, and the method
>>>> included in parquet-mr that I referenced,
>>>> readAllFootersInParallelUsingSummaryFiles, also goes against the notion
>>>> that this is a hard requirement. It specifically takes in a file listing,
>>>> which needs to be provided by some external source, either the FS directly,
>>>> or some transactional system like delta in my case and actually uses the
>>>> metadata summary file just as a cache for footers, with explicit code to
>>>> fall back to red any footers missing from the summary directly from the FS
>>>> itself.
>>>>
>>>> It sounds like some projects in the past have used this file to avoid
>>>> doing an FS listing, which I absolutely agree isn't safe to do and will
>>>> cause problems when people copy in new files to a directory. Can we just
>>>> document that this practice is bad? And possibly just deprecate any code
>>>> that reads the summary file without this kind of fallback and an
>>>> expectation that callers pass in a list of files they expect to get footers
>>>> for?
>>>>
>>>> I also don't know if I have ever seen a project take advantage of the
>>>> fact that you can technically directly append to a parquet file by reading
>>>> in the previous footer, appending new row groups and writing out a whole
>>>> new footer with the new metadata combined with the old, leaving dead bytes
>>>> in the file where the old footer sat. I do remember discussing this
>>>> possibility with Julien at some point, but I don't know if parquet-mr or
>>>> any other projects actually have written code to do this. If this is done,
>>>> it would provide another way for the summary file to become stale, and this
>>>> would not be detectable with just knowing the filename, the summary would
>>>> need to contain file length info.
>>>>
>>>> There is also the possibility that parquet files could be deleted and
>>>> rewritten in the same filenames, but this isn't common in any hadoop/spark
>>>> ecosystem projects I know of, they all generate unique filenames using
>>>> application IDs or GUIDs.
>>>>
>>>> Jason Altekruse
>>>>
>>>> On Tue, Sep 29, 2020 at 8:26 PM Ryan Blue <[email protected]>
>>>> wrote:
>>>>
>>>>> I don't remember deprecating it, but I've always recommended against
>>>>> using
>>>>> it because of the assumptions it requires.
>>>>>
>>>>> Those assumptions are routinely violated by processing engines and
>>>>> users
>>>>> that expect to be able to drop files into directories and see the
>>>>> results
>>>>> in their table. Since this was a feature without guard rails or
>>>>> documentation to explain how to safely use it, I think it is a good
>>>>> idea to
>>>>> steer people away from it and deprecate it unless someone wants to
>>>>> address
>>>>> those concerns. Now, I think there are much better alternatives (thanks
>>>>> Jacques!) so I probably wouldn't recommend spending time on bringing
>>>>> this
>>>>> up to date and making it marginally safer.
>>>>>
>>>>> On Tue, Sep 29, 2020 at 11:38 AM Julien Le Dem <[email protected]
>>>>> >
>>>>> wrote:
>>>>>
>>>>> > Hi Jason,
>>>>> > Thank you for bringing this up.
>>>>> > A correctness issue would only come up when more parquet files are
>>>>> > added to the same folder or files are modified. Historically folders
>>>>> have
>>>>> > been considered immutables and the summary file reflects the
>>>>> metadata for
>>>>> > all the files in the folder. The summary file contains the names of
>>>>> the
>>>>> > files it is for, so extra files in the folder can also be detected
>>>>> and
>>>>> > dealt with at read time without correctness issues.
>>>>> > Like you mentioned the read path allows for those files to not be
>>>>> present.
>>>>> > I think a better solution than deprecating would be to have a switch
>>>>> > allowing turning off those summary files when one wants to be able
>>>>> to not
>>>>> > respect the immutable folder contact.
>>>>> > Projects like Iceberg can elect to not produce them and allow
>>>>> modifying
>>>>> > and adding more files to the same folder without creating correctness
>>>>> > problems.
>>>>> > I would be in favor of removing those Deprecated annotations and
>>>>> document
>>>>> > the use of a switch to optionally not produce the summary files when
>>>>> > electing to modify folders.
>>>>> > I'm curious to hear from Ryan about this who did the change in the
>>>>> first
>>>>> > place.
>>>>> > Best,
>>>>> > Julien
>>>>> >
>>>>> > On Fri, Sep 25, 2020 at 3:06 PM Jason Altekruse <
>>>>> [email protected]>
>>>>> > wrote:
>>>>> >
>>>>> >> Hy Jacques,
>>>>> >>
>>>>> >> It's good to hear from you, thanks for the pointer to Iceberg. I am
>>>>> aware
>>>>> >> of it as well as other similar projects, including Delta Lake,
>>>>> which my
>>>>> >> team is already using. Unfortunately even with Delta, there is only
>>>>> a
>>>>> >> placeholder in the project currently where they will be tracking
>>>>> file
>>>>> >> level
>>>>> >> statistics at some point in the future, we are also evaluating the
>>>>> >> possibility of implementing this in delta itself. While it and
>>>>> Iceberg
>>>>> >> aren't quite the same architecturally, I think there is enough
>>>>> overlap
>>>>> >> that
>>>>> >> it might be a bit awkward to use the two in conjunction with one
>>>>> another.
>>>>> >>
>>>>> >> From my testing so far, it appears that delta pretty easily can
>>>>> operate
>>>>> >> alongside these older metadata summary files without the two
>>>>> fighting with
>>>>> >> each other. Delta is responsible for maintaining a transactionally
>>>>> >> consistent list of files, and this file can coexist in the
>>>>> directory just
>>>>> >> to allow efficient pruning on the driver side on a best effort
>>>>> basis, as
>>>>> >> it
>>>>> >> can gracefully fall back to the FS if it is missing a newer file.
>>>>> >>
>>>>> >> We are somewhat nervous about depending on something that is marked
>>>>> >> deprecated, but as it is so close to a "just works" state for our
>>>>> needs, I
>>>>> >> was hoping to confirm with the community if there were other risks
>>>>> I was
>>>>> >> missing.
>>>>> >>
>>>>> >> Jason Altekruse
>>>>> >>
>>>>> >> On Wed, Sep 23, 2020 at 6:29 PM Jacques Nadeau <[email protected]>
>>>>> >> wrote:
>>>>> >>
>>>>> >> > Hey Jason,
>>>>> >> >
>>>>> >> > I'd suggest you look at Apache Iceberg. It is a much more mature
>>>>> way of
>>>>> >> > handling metadata efficiency issues and provides a substantial
>>>>> superset
>>>>> >> of
>>>>> >> > functionality over the old metadata cache files.
>>>>> >> >
>>>>> >> > On Wed, Sep 23, 2020 at 4:16 PM Jason Altekruse <
>>>>> >> [email protected]>
>>>>> >> > wrote:
>>>>> >> >
>>>>> >> > > Hello again,
>>>>> >> > >
>>>>> >> > > I took a look through the mail archives and found a little more
>>>>> >> > information
>>>>> >> > > in this and a few other threads.
>>>>> >> > >
>>>>> >> > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> http://mail-archives.apache.org/mod_mbox//parquet-dev/201707.mbox/%3CCAO4re1k8-bZZZWBRuLCnm1V7AoJE1fdunSuBn%2BecRuFGPgcXnA%40mail.gmail.com%3E
>>>>> >> > >
>>>>> >> > > While I do understand the benefits for federating out the
>>>>> reading of
>>>>> >> > > footers for the sake of not worrying about synchronization
>>>>> between the
>>>>> >> > > cached metadata and any changes to the files on disk, it does
>>>>> appear
>>>>> >> > there
>>>>> >> > > is still a use case that isn't solved well with this design,
>>>>> needle
>>>>> >> in a
>>>>> >> > > haystack selective filter queries, where the data is sorted by
>>>>> the
>>>>> >> filter
>>>>> >> > > column. For example in the tests I ran with queries against
>>>>> lots of
>>>>> >> > parquet
>>>>> >> > > files where the vast majority are pruned by a bunch of small
>>>>> tasks, it
>>>>> >> > > takes 33 seconds vs just 1-2 seconds with driver side pruning
>>>>> using
>>>>> >> the
>>>>> >> > > summary file (requires a small spark changet).
>>>>> >> > >
>>>>> >> > > In our use case we are never going to be replacing contents of
>>>>> >> existing
>>>>> >> > > parquet files (with a delete and rewrite on HDFS) or appending
>>>>> new row
>>>>> >> > > groups onto existing files. In that case I don't believe we
>>>>> should
>>>>> >> > > experience any correctness problems, but I wanted to confirm if
>>>>> there
>>>>> >> is
>>>>> >> > > something I am missing. I am
>>>>> >> > > using readAllFootersInParallelUsingSummaryFiles which does fall
>>>>> back
>>>>> >> to
>>>>> >> > > read individual footers if they are missing from the summary
>>>>> file.
>>>>> >> > >
>>>>> >> > > I am also curious if a solution to the correctness problems
>>>>> could be
>>>>> >> to
>>>>> >> > > include a file length and/or last modified time into the
>>>>> summary file,
>>>>> >> > > which could confirm against FS metadata that the files on disk
>>>>> are
>>>>> >> still
>>>>> >> > in
>>>>> >> > > sync with the metadata summary relatively quickly. Would it be
>>>>> >> possible
>>>>> >> > to
>>>>> >> > > consider avoiding this deprecation if I was to work on an
>>>>> update to
>>>>> >> > > implement this?
>>>>> >> > >
>>>>> >> > > - Jason Altekruse
>>>>> >> > >
>>>>> >> > >
>>>>> >> > > On Tue, Sep 15, 2020 at 8:52 PM Jason Altekruse <
>>>>> >> > [email protected]>
>>>>> >> > > wrote:
>>>>> >> > >
>>>>> >> > > > Hello all,
>>>>> >> > > >
>>>>> >> > > > I have been working on optimizing reads in spark to avoid
>>>>> spinning
>>>>> >> up
>>>>> >> > > lots
>>>>> >> > > > of short lived tasks that just perform row group pruning in
>>>>> >> selective
>>>>> >> > > > filter queries.
>>>>> >> > > >
>>>>> >> > > > My high level question is why metadata summary files were
>>>>> marked
>>>>> >> > > > deprecated in this Parquet changeset? There isn't much
>>>>> explanation
>>>>> >> > given
>>>>> >> > > > or a description of what should be used instead.
>>>>> >> > > > https://github.com/apache/parquet-mr/pull/429
>>>>> >> > > >
>>>>> >> > > > There are other members of the broader parquet community that
>>>>> are
>>>>> >> also
>>>>> >> > > > confused by this deprecation, see this discussion in an arrow
>>>>> PR.
>>>>> >> > > > https://github.com/apache/arrow/pull/4166
>>>>> >> > > >
>>>>> >> > > > In the course of making my small prototype I got an extra
>>>>> >> performance
>>>>> >> > > > boost by making spark write out metadata summary files,
>>>>> rather than
>>>>> >> > > having
>>>>> >> > > > to read all footers on the driver. This effect would be even
>>>>> more
>>>>> >> > > > pronounced on a completely remote storage system like S3.
>>>>> Writing
>>>>> >> these
>>>>> >> > > > summary files was disabled by default in SPARK-15719, because
>>>>> of the
>>>>> >> > > > performance impact of appending a small number of new files
>>>>> to an
>>>>> >> > > existing
>>>>> >> > > > dataset with many files.
>>>>> >> > > >
>>>>> >> > > > https://issues.apache.org/jira/browse/SPARK-15719
>>>>> >> > > >
>>>>> >> > > > This spark JIRA does make decent points considering how spark
>>>>> >> operates
>>>>> >> > > > today, but I think that there is a performance optimization
>>>>> >> opportunity
>>>>> >> > > > that is missed because the row group pruning is deferred to a
>>>>> bunch
>>>>> >> of
>>>>> >> > > > separate short lived tasks rather than done upfront,
>>>>> currently spark
>>>>> >> > only
>>>>> >> > > > uses footers on the driver for schema merging.
>>>>> >> > > >
>>>>> >> > > > Thanks for the help!
>>>>> >> > > > Jason Altekruse
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> >
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>