This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8c800160b62 [SPARK-38999][SQL] Refactor `FileSourceScanExec`: file
scan physical node
8c800160b62 is described below
commit 8c800160b62657fa5ab16a69ab694360897468d6
Author: Utkarsh <[email protected]>
AuthorDate: Mon Apr 25 15:44:20 2022 +0800
[SPARK-38999][SQL] Refactor `FileSourceScanExec`: file scan physical node
### What changes were proposed in this pull request?
The PR refactors `FileSourceScanExec` case class into a base trait
`FileSourceScanLike` which is then subclassed by `FileSourceScanExec`.
`FileSourceScanLike` contains basic functionality like metrics and file listing
while the `FileSourceScanExec` contains execution specific code.
### Why are the changes needed?
Currently the code for `FileSourceScanExec` class, the physical node for
the file scans is quite complex and lengthy making it slightly difficult to
reason about.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Code refactor, existing tests should suffice.
Closes #36327 from utkarsh39/split-file-scan-node.
Authored-by: Utkarsh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/DataSourceScanExec.scala | 208 ++++++++++++---------
1 file changed, 117 insertions(+), 91 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5cf8aa91ea5..953a7db0f9d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution
import java.util.concurrent.TimeUnit._
-import scala.collection.mutable.HashMap
-
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
@@ -179,48 +177,34 @@ case class RowDataSourceScanExec(
}
/**
- * Physical plan node for scanning data from HadoopFsRelations.
- *
- * @param relation The file-based relation to scan.
- * @param output Output attributes of the scan, including data attributes and
partition attributes.
- * @param requiredSchema Required schema of the underlying relation, excluding
partition columns.
- * @param partitionFilters Predicates to use for partition pruning.
- * @param optionalBucketSet Bucket ids for bucket pruning.
- * @param optionalNumCoalescedBuckets Number of coalesced buckets.
- * @param dataFilters Filters on non-partition columns.
- * @param tableIdentifier Identifier for the table in the metastore.
- * @param disableBucketedScan Disable bucketed scan based on physical query
plan, see rule
- * [[DisableUnnecessaryBucketedScan]] for details.
+ * A base trait for file scans containing file listing and metrics code.
*/
-case class FileSourceScanExec(
- @transient relation: HadoopFsRelation,
- output: Seq[Attribute],
- requiredSchema: StructType,
- partitionFilters: Seq[Expression],
- optionalBucketSet: Option[BitSet],
- optionalNumCoalescedBuckets: Option[Int],
- dataFilters: Seq[Expression],
- tableIdentifier: Option[TableIdentifier],
- disableBucketedScan: Boolean = false)
- extends DataSourceScanExec {
+trait FileSourceScanLike extends DataSourceScanExec {
+
+ // Filters on non-partition columns.
+ def dataFilters: Seq[Expression]
+ // Disable bucketed scan based on physical query plan, see rule
+ // [[DisableUnnecessaryBucketedScan]] for details.
+ def disableBucketedScan: Boolean
+ // Bucket ids for bucket pruning.
+ def optionalBucketSet: Option[BitSet]
+ // Number of coalesced buckets.
+ def optionalNumCoalescedBuckets: Option[Int]
+ // Output attributes of the scan, including data attributes and partition
attributes.
+ def output: Seq[Attribute]
+ // Predicates to use for partition pruning.
+ def partitionFilters: Seq[Expression]
+ // The file-based relation to scan.
+ def relation: HadoopFsRelation
+ // Required schema of the underlying relation, excluding partition columns.
+ def requiredSchema: StructType
+ // Identifier for the table in the metastore.
+ def tableIdentifier: Option[TableIdentifier]
+
lazy val metadataColumns: Seq[AttributeReference] =
output.collect { case FileSourceMetadataAttribute(attr) => attr }
- // Note that some vals referring the file-based relation are lazy
intentionally
- // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
- override lazy val supportsColumnar: Boolean = {
- relation.fileFormat.supportBatch(relation.sparkSession, schema)
- }
-
- private lazy val needsUnsafeRowConversion: Boolean = {
- if (relation.fileFormat.isInstanceOf[ParquetSource]) {
- conf.parquetVectorizedReaderEnabled
- } else {
- false
- }
- }
-
override def vectorTypes: Option[Seq[String]] =
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
@@ -230,17 +214,28 @@ case class FileSourceScanExec(
vectorTypes ++
Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
}
- private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
+ lazy val driverMetrics = Map(
+ "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files
read"),
+ "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata
time"),
+ "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files
read")
+ ) ++ {
+ if (relation.partitionSchema.nonEmpty) {
+ Map(
+ "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions read"),
+ "pruningTime" ->
+ SQLMetrics.createTimingMetric(sparkContext, "dynamic partition
pruning time"))
+ } else {
+ Map.empty[String, SQLMetric]
+ }
+ } ++ staticMetrics
/**
* Send the driver-side metrics. Before calling this function,
selectedPartitions has
* been initialized. See SPARK-26327 for more details.
*/
- private def sendDriverMetrics(): Unit = {
- driverMetrics.foreach(e => metrics(e._1).add(e._2))
+ protected def sendDriverMetrics(): Unit = {
val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
- metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
driverMetrics.values.toSeq)
}
private def isDynamicPruningFilter(e: Expression): Boolean =
@@ -255,14 +250,14 @@ case class FileSourceScanExec(
setFilesNumAndSizeMetric(ret, true)
val timeTakenMs = NANOSECONDS.toMillis(
(System.nanoTime() - startTime) + optimizerMetadataTimeNs)
- driverMetrics("metadataTime") = timeTakenMs
+ driverMetrics("metadataTime").set(timeTakenMs)
ret
}.toArray
// We can only determine the actual partitions at runtime when a dynamic
partition filter is
// present. This is because such a filter relies on information that is only
available at run
// time (for instance the keys used in the other side of a join).
- @transient private lazy val dynamicallySelectedPartitions:
Array[PartitionDirectory] = {
+ @transient protected lazy val dynamicallySelectedPartitions:
Array[PartitionDirectory] = {
val dynamicPartitionFilters =
partitionFilters.filter(isDynamicPruningFilter)
if (dynamicPartitionFilters.nonEmpty) {
@@ -278,7 +273,7 @@ case class FileSourceScanExec(
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, false)
val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
- driverMetrics("pruningTime") = timeTakenMs
+ driverMetrics("pruningTime").set(timeTakenMs)
ret
} else {
selectedPartitions
@@ -369,7 +364,7 @@ case class FileSourceScanExec(
}
@transient
- private lazy val pushedDownFilters = {
+ protected lazy val pushedDownFilters = {
val supportNestedPredicatePushdown =
DataSourceUtils.supportNestedPredicatePushdown(relation)
// `dataFilters` should not include any metadata col filters
// because the metadata struct has been flatted in FileSourceStrategy
@@ -445,33 +440,10 @@ case class FileSourceScanExec(
|""".stripMargin
}
- lazy val inputRDD: RDD[InternalRow] = {
- val readFile: (PartitionedFile) => Iterator[InternalRow] =
- relation.fileFormat.buildReaderWithPartitionValues(
- sparkSession = relation.sparkSession,
- dataSchema = relation.dataSchema,
- partitionSchema = relation.partitionSchema,
- requiredSchema = requiredSchema,
- filters = pushedDownFilters,
- options = relation.options,
- hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
-
- val readRDD = if (bucketedScan) {
- createBucketedReadRDD(relation.bucketSpec.get, readFile,
dynamicallySelectedPartitions,
- relation)
- } else {
- createReadRDD(readFile, dynamicallySelectedPartitions, relation)
- }
- sendDriverMetrics()
- readRDD
- }
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = {
- inputRDD :: Nil
- }
+ override def metrics: Map[String, SQLMetric] = scanMetrics
/** SQL metrics generated only for scans using dynamic partition pruning. */
- private lazy val staticMetrics = if
(partitionFilters.exists(isDynamicPruningFilter)) {
+ protected lazy val staticMetrics = if
(partitionFilters.exists(isDynamicPruningFilter)) {
Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static
number of files read"),
"staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static
size of files read"))
} else {
@@ -485,22 +457,19 @@ case class FileSourceScanExec(
val filesNum = partitions.map(_.files.size.toLong).sum
val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
- driverMetrics("numFiles") = filesNum
- driverMetrics("filesSize") = filesSize
+ driverMetrics("numFiles").set(filesNum)
+ driverMetrics("filesSize").set(filesSize)
} else {
- driverMetrics("staticFilesNum") = filesNum
- driverMetrics("staticFilesSize") = filesSize
+ driverMetrics("staticFilesNum").set(filesNum)
+ driverMetrics("staticFilesSize").set(filesSize)
}
if (relation.partitionSchema.nonEmpty) {
- driverMetrics("numPartitions") = partitions.length
+ driverMetrics("numPartitions").set(partitions.length)
}
}
- override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
- "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files
read"),
- "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata
time"),
- "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files
read")
+ private lazy val scanMetrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows")
) ++ {
// Tracking scan time has overhead, we can't afford to do it for each row,
and can only do
// it for each batch.
@@ -509,16 +478,73 @@ case class FileSourceScanExec(
} else {
None
}
- } ++ {
- if (relation.partitionSchema.nonEmpty) {
- Map(
- "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions read"),
- "pruningTime" ->
- SQLMetrics.createTimingMetric(sparkContext, "dynamic partition
pruning time"))
+ } ++ driverMetrics
+}
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query
plan, see rule
+ * [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class FileSourceScanExec(
+ @transient override val relation: HadoopFsRelation,
+ override val output: Seq[Attribute],
+ override val requiredSchema: StructType,
+ override val partitionFilters: Seq[Expression],
+ override val optionalBucketSet: Option[BitSet],
+ override val optionalNumCoalescedBuckets: Option[Int],
+ override val dataFilters: Seq[Expression],
+ override val tableIdentifier: Option[TableIdentifier],
+ override val disableBucketedScan: Boolean = false)
+ extends FileSourceScanLike {
+
+ // Note that some vals referring the file-based relation are lazy
intentionally
+ // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
+ override lazy val supportsColumnar: Boolean = {
+ relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ }
+
+ private lazy val needsUnsafeRowConversion: Boolean = {
+ if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+ conf.parquetVectorizedReaderEnabled
} else {
- Map.empty[String, SQLMetric]
+ false
}
- } ++ staticMetrics
+ }
+
+ lazy val inputRDD: RDD[InternalRow] = {
+ val readFile: (PartitionedFile) => Iterator[InternalRow] =
+ relation.fileFormat.buildReaderWithPartitionValues(
+ sparkSession = relation.sparkSession,
+ dataSchema = relation.dataSchema,
+ partitionSchema = relation.partitionSchema,
+ requiredSchema = requiredSchema,
+ filters = pushedDownFilters,
+ options = relation.options,
+ hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+ val readRDD = if (bucketedScan) {
+ createBucketedReadRDD(relation.bucketSpec.get, readFile,
dynamicallySelectedPartitions,
+ relation)
+ } else {
+ createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+ }
+ sendDriverMetrics()
+ readRDD
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ inputRDD :: Nil
+ }
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]