This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5644fc226 [CORE] Prior to #4893, add vanilla Spark's original scan
source code to keep git history
5644fc226 is described below
commit 5644fc2268e56d2e629177b62808d4c3190abc61
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Mar 13 16:44:48 2024 +0800
[CORE] Prior to #4893, add vanilla Spark's original scan source code to
keep git history
---
.../sql/execution/AbstractFileSourceScanExec.scala | 524 +++++++++++++++++++
.../datasources/v2/AbstractBatchScanExec.scala | 111 ++++
.../sql/execution/AbstractFileSourceScanExec.scala | 569 +++++++++++++++++++++
.../datasources/v2/AbstractBatchScanExec.scala | 141 +++++
.../sql/execution/AbstractFileSourceScanExec.scala | 285 +++++++++++
.../datasources/v2/AbstractBatchScanExec.scala | 251 +++++++++
.../sql/execution/AbstractFileSourceScanExec.scala | 273 ++++++++++
.../datasources/v2/AbstractBatchScanExec.scala | 273 ++++++++++
8 files changed, 2427 insertions(+)
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..c5d17045b
--- /dev/null
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,524 @@
+package org.apache.spark.sql.execution
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+import java.util.concurrent.TimeUnit._
+import scala.collection.mutable.HashMap
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query
plan, see rule
+ * [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+ @transient relation: HadoopFsRelation,
+ output: Seq[Attribute],
+ requiredSchema: StructType,
+ partitionFilters: Seq[Expression],
+ optionalBucketSet: Option[BitSet],
+ optionalNumCoalescedBuckets: Option[Int],
+ dataFilters: Seq[Expression],
+ tableIdentifier: Option[TableIdentifier],
+ disableBucketedScan: Boolean = false)
+ extends DataSourceScanExec {
+
+ // Note that some vals referring the file-based relation are lazy
intentionally
+ // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
+ override lazy val supportsColumnar: Boolean = {
+ relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ }
+
+ private lazy val needsUnsafeRowConversion: Boolean = {
+ if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+ conf.parquetVectorizedReaderEnabled
+ } else {
+ false
+ }
+ }
+
+ override def vectorTypes: Option[Seq[String]] =
+ relation.fileFormat.vectorTypes(
+ requiredSchema = requiredSchema,
+ partitionSchema = relation.partitionSchema,
+ relation.sparkSession.sessionState.conf)
+
+ private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
+
+ /**
+ * Send the driver-side metrics. Before calling this function,
selectedPartitions has
+ * been initialized. See SPARK-26327 for more details.
+ */
+ private def sendDriverMetrics(): Unit = {
+ driverMetrics.foreach(e => metrics(e._1).add(e._2))
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+ }
+
+ private def isDynamicPruningFilter(e: Expression): Boolean =
+ e.find(_.isInstanceOf[PlanExpression[_]]).isDefined
+
+ @transient lazy val selectedPartitions: Array[PartitionDirectory] = {
+ val optimizerMetadataTimeNs =
relation.location.metadataOpsTimeNs.getOrElse(0L)
+ val startTime = System.nanoTime()
+ val ret =
+ relation.location.listFiles(
+ partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)
+ setFilesNumAndSizeMetric(ret, true)
+ val timeTakenMs = NANOSECONDS.toMillis(
+ (System.nanoTime() - startTime) + optimizerMetadataTimeNs)
+ driverMetrics("metadataTime") = timeTakenMs
+ ret
+ }.toArray
+
+ // We can only determine the actual partitions at runtime when a dynamic
partition filter is
+ // present. This is because such a filter relies on information that is only
available at run
+ // time (for instance the keys used in the other side of a join).
+ @transient private lazy val dynamicallySelectedPartitions:
Array[PartitionDirectory] = {
+ val dynamicPartitionFilters =
partitionFilters.filter(isDynamicPruningFilter)
+
+ if (dynamicPartitionFilters.nonEmpty) {
+ val startTime = System.nanoTime()
+ // call the file index for the files matching all filters except dynamic
partition filters
+ val predicate = dynamicPartitionFilters.reduce(And)
+ val partitionColumns = relation.partitionSchema
+ val boundPredicate = Predicate.create(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionColumns.indexWhere(a.name == _.name)
+ BoundReference(index, partitionColumns(index).dataType, nullable =
true)
+ }, Nil)
+ val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
+ setFilesNumAndSizeMetric(ret, false)
+ val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
+ driverMetrics("pruningTime") = timeTakenMs
+ ret
+ } else {
+ selectedPartitions
+ }
+ }
+
+ /**
+ * [[partitionFilters]] can contain subqueries whose results are available
only at runtime so
+ * accessing [[selectedPartitions]] should be guarded by this method during
planning
+ */
+ private def hasPartitionsAvailableAtRunTime: Boolean = {
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
+ }
+
+ private def toAttribute(colName: String): Option[Attribute] =
+ output.find(_.name == colName)
+
+ // exposed for testing
+ lazy val bucketedScan: Boolean = {
+ if (relation.sparkSession.sessionState.conf.bucketingEnabled &&
relation.bucketSpec.isDefined
+ && !disableBucketedScan) {
+ val spec = relation.bucketSpec.get
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ bucketColumns.size == spec.bucketColumnNames.size
+ } else {
+ false
+ }
+ }
+
+ override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) = {
+ if (bucketedScan) {
+ // For bucketed columns:
+ // -----------------------
+ // `HashPartitioning` would be used only when:
+ // 1. ALL the bucketing columns are being read from the table
+ //
+ // For sorted columns:
+ // ---------------------
+ // Sort ordering should be used when ALL these criteria's match:
+ // 1. `HashPartitioning` is being used
+ // 2. A prefix (or all) of the sort columns are being read from the
table.
+ //
+ // Sort ordering would be over the prefix subset of `sort columns` being
read
+ // from the table.
+ // e.g.
+ // Assume (col0, col2, col3) are the columns read from the table
+ // If sort columns are (col0, col1), then sort ordering would be
considered as (col0)
+ // If sort columns are (col1, col0), then sort ordering would be empty
as per rule #2
+ // above
+ val spec = relation.bucketSpec.get
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ val numPartitions =
optionalNumCoalescedBuckets.getOrElse(spec.numBuckets)
+ val partitioning = HashPartitioning(bucketColumns, numPartitions)
+ val sortColumns =
+ spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x =>
x.isDefined).map(_.get)
+ val shouldCalculateSortOrder =
+ conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
+ sortColumns.nonEmpty &&
+ !hasPartitionsAvailableAtRunTime
+
+ val sortOrder = if (shouldCalculateSortOrder) {
+ // In case of bucketing, its possible to have multiple files belonging
to the
+ // same bucket in a given relation. Each of these files are locally
sorted
+ // but those files combined together are not globally sorted. Given
that,
+ // the RDD partition will not be sorted even if the relation has sort
columns set
+ // Current solution is to check if all the buckets have a single file
in it
+
+ val files = selectedPartitions.flatMap(partition => partition.files)
+ val bucketToFilesGrouping =
+ files.map(_.getPath.getName).groupBy(file =>
BucketingUtils.getBucketId(file))
+ val singleFilePartitions = bucketToFilesGrouping.forall(p =>
p._2.length <= 1)
+
+ // TODO SPARK-24528 Sort order is currently ignored if buckets are
coalesced.
+ if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
+ // TODO Currently Spark does not support writing columns sorting in
descending order
+ // so using Ascending order. This can be fixed in future
+ sortColumns.map(attribute => SortOrder(attribute, Ascending))
+ } else {
+ Nil
+ }
+ } else {
+ Nil
+ }
+ (partitioning, sortOrder)
+ } else {
+ (UnknownPartitioning(0), Nil)
+ }
+ }
+
+ @transient
+ private lazy val pushedDownFilters = {
+ val supportNestedPredicatePushdown =
DataSourceUtils.supportNestedPredicatePushdown(relation)
+ dataFilters.flatMap(DataSourceStrategy.translateFilter(_,
supportNestedPredicatePushdown))
+ }
+
+ override lazy val metadata: Map[String, String] = {
+ def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+ val location = relation.location
+ val locationDesc =
+ location.getClass.getSimpleName +
+ Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
+ val metadata =
+ Map(
+ "Format" -> relation.fileFormat.toString,
+ "ReadSchema" -> requiredSchema.catalogString,
+ "Batched" -> supportsColumnar.toString,
+ "PartitionFilters" -> seqToString(partitionFilters),
+ "PushedFilters" -> seqToString(pushedDownFilters),
+ "DataFilters" -> seqToString(dataFilters),
+ "Location" -> locationDesc)
+
+ // TODO(SPARK-32986): Add bucketed scan info in explain output of
AbstractFileSourceScanExec
+ if (bucketedScan) {
+ relation.bucketSpec.map { spec =>
+ val numSelectedBuckets = optionalBucketSet.map { b =>
+ b.cardinality()
+ } getOrElse {
+ spec.numBuckets
+ }
+ metadata + ("SelectedBucketsCount" ->
+ (s"$numSelectedBuckets out of ${spec.numBuckets}" +
+ optionalNumCoalescedBuckets.map { b => s" (Coalesced to
$b)"}.getOrElse("")))
+ } getOrElse {
+ metadata
+ }
+ } else {
+ metadata
+ }
+ }
+
+ override def verboseStringWithOperatorId(): String = {
+ val metadataStr = metadata.toSeq.sorted.filterNot {
+ case (_, value) if (value.isEmpty || value.equals("[]")) => true
+ case (key, _) if (key.equals("DataFilters") || key.equals("Format")) =>
true
+ case (_, _) => false
+ }.map {
+ case (key, _) if (key.equals("Location")) =>
+ val location = relation.location
+ val numPaths = location.rootPaths.length
+ val abbreviatedLocation = if (numPaths <= 1) {
+ location.rootPaths.mkString("[", ", ", "]")
+ } else {
+ "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
+ }
+ s"$key: ${location.getClass.getSimpleName}
${redact(abbreviatedLocation)}"
+ case (key, value) => s"$key: ${redact(value)}"
+ }
+
+ s"""
+ |$formattedNodeName
+ |${ExplainUtils.generateFieldString("Output", output)}
+ |${metadataStr.mkString("\n")}
+ |""".stripMargin
+ }
+
+ lazy val inputRDD: RDD[InternalRow] = {
+ val readFile: (PartitionedFile) => Iterator[InternalRow] =
+ relation.fileFormat.buildReaderWithPartitionValues(
+ sparkSession = relation.sparkSession,
+ dataSchema = relation.dataSchema,
+ partitionSchema = relation.partitionSchema,
+ requiredSchema = requiredSchema,
+ filters = pushedDownFilters,
+ options = relation.options,
+ hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+ val readRDD = if (bucketedScan) {
+ createBucketedReadRDD(relation.bucketSpec.get, readFile,
dynamicallySelectedPartitions,
+ relation)
+ } else {
+ createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+ }
+ sendDriverMetrics()
+ readRDD
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ inputRDD :: Nil
+ }
+
+ /** SQL metrics generated only for scans using dynamic partition pruning. */
+ private lazy val staticMetrics = if
(partitionFilters.exists(isDynamicPruningFilter)) {
+ Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static
number of files read"),
+ "staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static
size of files read"))
+ } else {
+ Map.empty[String, SQLMetric]
+ }
+
+ /** Helper for computing total number and size of files in selected
partitions. */
+ private def setFilesNumAndSizeMetric(
+ partitions: Seq[PartitionDirectory],
+ static: Boolean): Unit = {
+ val filesNum = partitions.map(_.files.size.toLong).sum
+ val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
+ if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
+ driverMetrics("numFiles") = filesNum
+ driverMetrics("filesSize") = filesSize
+ } else {
+ driverMetrics("staticFilesNum") = filesNum
+ driverMetrics("staticFilesSize") = filesSize
+ }
+ if (relation.partitionSchemaOption.isDefined) {
+ driverMetrics("numPartitions") = partitions.length
+ }
+ }
+
+ override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files
read"),
+ "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata
time"),
+ "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files
read")
+ ) ++ {
+ // Tracking scan time has overhead, we can't afford to do it for each row,
and can only do
+ // it for each batch.
+ if (supportsColumnar) {
+ Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan
time"))
+ } else {
+ None
+ }
+ } ++ {
+ if (relation.partitionSchemaOption.isDefined) {
+ Map(
+ "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions read"),
+ "pruningTime" ->
+ SQLMetrics.createTimingMetric(sparkContext, "dynamic partition
pruning time"))
+ } else {
+ Map.empty[String, SQLMetric]
+ }
+ } ++ staticMetrics
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ if (needsUnsafeRowConversion) {
+ inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+ val toUnsafe = UnsafeProjection.create(schema)
+ toUnsafe.initialize(index)
+ iter.map { row =>
+ numOutputRows += 1
+ toUnsafe(row)
+ }
+ }
+ } else {
+ inputRDD.mapPartitionsInternal { iter =>
+ iter.map { row =>
+ numOutputRows += 1
+ row
+ }
+ }
+ }
+ }
+
+ protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val scanTime = longMetric("scanTime")
+ inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches
=>
+ new Iterator[ColumnarBatch] {
+
+ override def hasNext: Boolean = {
+ // The `FileScanRDD` returns an iterator which scans the file during
the `hasNext` call.
+ val startNs = System.nanoTime()
+ val res = batches.hasNext
+ scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+ res
+ }
+
+ override def next(): ColumnarBatch = {
+ val batch = batches.next()
+ numOutputRows += batch.numRows()
+ batch
+ }
+ }
+ }
+ }
+
+ override val nodeNamePrefix: String = "File"
+
+ /**
+ * Create an RDD for bucketed reads.
+ * The non-bucketed variant of this function is [[createReadRDD]].
+ *
+ * The algorithm is pretty simple: each RDD partition being returned should
include all the files
+ * with the same bucket id from all the given Hive partitions.
+ *
+ * @param bucketSpec the bucketing spec.
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createBucketedReadRDD(
+ bucketSpec: BucketSpec,
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions:
Array[PartitionDirectory],
+ fsRelation: HadoopFsRelation):
RDD[InternalRow] = {
+ logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+ val filesGroupedToBuckets =
+ selectedPartitions.flatMap { p =>
+ p.files.map { f =>
+ PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
+ }
+ }.groupBy { f =>
+ BucketingUtils
+ .getBucketId(new Path(f.filePath).getName)
+ .getOrElse(throw new IllegalStateException(s"Invalid bucket file
${f.filePath}"))
+ }
+
+ val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+ val bucketSet = optionalBucketSet.get
+ filesGroupedToBuckets.filter {
+ f => bucketSet.get(f._1)
+ }
+ } else {
+ filesGroupedToBuckets
+ }
+
+ val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets
=>
+ logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+ val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
numCoalescedBuckets)
+ Seq.tabulate(numCoalescedBuckets) { bucketId =>
+ val partitionedFiles = coalescedBuckets.get(bucketId).map {
+ _.values.flatten.toArray
+ }.getOrElse(Array.empty)
+ FilePartition(bucketId, partitionedFiles)
+ }
+ }.getOrElse {
+ Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+ FilePartition(bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+ }
+ }
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+ }
+
+ /**
+ * Create an RDD for non-bucketed reads.
+ * The bucketed variant of this function is [[createBucketedReadRDD]].
+ *
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createReadRDD(
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions: Array[PartitionDirectory],
+ fsRelation: HadoopFsRelation): RDD[InternalRow] =
{
+ val openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ val maxSplitBytes =
+ FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+ logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ // Filter files with bucket pruning if possible
+ val bucketingEnabled =
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+ val shouldProcess: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) if bucketingEnabled =>
+ // Do not prune the file if bucket file name is invalid
+ filePath =>
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+ case _ =>
+ _ => true
+ }
+
+ val splitFiles = selectedPartitions.flatMap { partition =>
+ partition.files.flatMap { file =>
+ // getPath() is very expensive so we only want to call it once in this
block:
+ val filePath = file.getPath
+
+ if (shouldProcess(filePath)) {
+ val isSplitable = relation.fileFormat.isSplitable(
+ relation.sparkSession, relation.options, filePath)
+ PartitionedFileUtil.splitFiles(
+ sparkSession = relation.sparkSession,
+ file = file,
+ filePath = filePath,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partition.values
+ )
+ } else {
+ Seq.empty
+ }
+ }
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+ val partitions =
+ FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+ }
+
+ // Filters unused DynamicPruningExpression expressions - one which has been
replaced
+ // with DynamicPruningExpression(Literal.TrueLiteral) during Physical
Planning
+ private def filterUnusedDynamicPruningExpressions(
+ predicates:
Seq[Expression]): Seq[Expression] = {
+ predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+ }
+
+ override def doCanonicalize(): AbstractFileSourceScanExec = {
+ AbstractFileSourceScanExec(
+ relation,
+ output.map(QueryPlan.normalizeExpressions(_, output)),
+ requiredSchema,
+ QueryPlan.normalizePredicates(
+ filterUnusedDynamicPruningExpressions(partitionFilters), output),
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ QueryPlan.normalizePredicates(dataFilters, output),
+ None,
+ disableBucketedScan)
+ }
+}
\ No newline at end of file
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..42bce39ee
--- /dev/null
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import com.google.common.base.Objects
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory, Scan, SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class AbstractBatchScanExec(
+ output: Seq[AttributeReference],
+ @transient scan: Scan,
+ runtimeFilters: Seq[Expression]) extends DataSourceV2ScanExecBase {
+
+ @transient lazy val batch = scan.toBatch
+
+ // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
+ override def equals(other: Any): Boolean = other match {
+ case other: AbstractBatchScanExec =>
+ this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+ @transient override lazy val partitions: Seq[InputPartition] =
batch.planInputPartitions()
+
+ @transient private lazy val filteredPartitions: Seq[InputPartition] = {
+ val dataSourceFilters = runtimeFilters.flatMap {
+ case DynamicPruningExpression(e) =>
DataSourceStrategy.translateRuntimeFilter(e)
+ case _ => None
+ }
+
+ if (dataSourceFilters.nonEmpty) {
+ val originalPartitioning = outputPartitioning
+
+ // the cast is safe as runtime filters are only assigned if the scan can
be filtered
+ val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
+ filterableScan.filter(dataSourceFilters.toArray)
+
+ // call toBatch again to get filtered partitions
+ val newPartitions = scan.toBatch.planInputPartitions()
+
+ originalPartitioning match {
+ case p: DataSourcePartitioning if p.numPartitions !=
newPartitions.size =>
+ throw new SparkException(
+ "Data source must have preserved the original partitioning during
runtime filtering; " +
+ s"reported num partitions: ${p.numPartitions}, " +
+ s"num partitions after runtime filtering: ${newPartitions.size}")
+ case _ =>
+ // no validation is needed as the data source did not report any
specific partitioning
+ }
+
+ newPartitions
+ } else {
+ partitions
+ }
+ }
+
+ override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
+
+ override lazy val inputRDD: RDD[InternalRow] = {
+ if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
+ // return an empty RDD with 1 partition if dynamic filtering removed the
only split
+ sparkContext.parallelize(Array.empty[InternalRow], 1)
+ } else {
+ new DataSourceRDD(
+ sparkContext, filteredPartitions, readerFactory, supportsColumnar,
customMetrics)
+ }
+ }
+
+ override def doCanonicalize(): AbstractBatchScanExec = {
+ this.copy(
+ output = output.map(QueryPlan.normalizeExpressions(_, output)),
+ runtimeFilters = QueryPlan.normalizePredicates(
+ runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
+ output))
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
+ val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
+ val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
+ redact(result)
+ }
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..6375177e7
--- /dev/null
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+import java.util.concurrent.TimeUnit._
+
+import scala.collection.mutable.HashMap
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
+import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query
plan, see rule
+ * [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+ @transient relation: HadoopFsRelation,
+ output: Seq[Attribute],
+ requiredSchema: StructType,
+ partitionFilters: Seq[Expression],
+ optionalBucketSet: Option[BitSet],
+ optionalNumCoalescedBuckets: Option[Int],
+ dataFilters: Seq[Expression],
+ tableIdentifier: Option[TableIdentifier],
+ disableBucketedScan: Boolean = false)
+ extends DataSourceScanExec {
+
+ lazy val metadataColumns: Seq[AttributeReference] =
+ output.collect { case FileSourceMetadataAttribute(attr) => attr }
+
+ // Note that some vals referring the file-based relation are lazy
intentionally
+ // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
+ override lazy val supportsColumnar: Boolean = {
+ relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ }
+
+ private lazy val needsUnsafeRowConversion: Boolean = {
+ if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+ conf.parquetVectorizedReaderEnabled
+ } else {
+ false
+ }
+ }
+
+ override def vectorTypes: Option[Seq[String]] =
+ relation.fileFormat.vectorTypes(
+ requiredSchema = requiredSchema,
+ partitionSchema = relation.partitionSchema,
+ relation.sparkSession.sessionState.conf).map { vectorTypes =>
+ // for column-based file format, append metadata column's vector type
classes if any
+ vectorTypes ++
Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
+ }
+
+ private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
+
+ /**
+ * Send the driver-side metrics. Before calling this function,
selectedPartitions has
+ * been initialized. See SPARK-26327 for more details.
+ */
+ private def sendDriverMetrics(): Unit = {
+ driverMetrics.foreach(e => metrics(e._1).add(e._2))
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+ }
+
+ private def isDynamicPruningFilter(e: Expression): Boolean =
+ e.exists(_.isInstanceOf[PlanExpression[_]])
+
+ @transient lazy val selectedPartitions: Array[PartitionDirectory] = {
+ val optimizerMetadataTimeNs =
relation.location.metadataOpsTimeNs.getOrElse(0L)
+ val startTime = System.nanoTime()
+ val ret =
+ relation.location.listFiles(
+ partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)
+ setFilesNumAndSizeMetric(ret, true)
+ val timeTakenMs = NANOSECONDS.toMillis(
+ (System.nanoTime() - startTime) + optimizerMetadataTimeNs)
+ driverMetrics("metadataTime") = timeTakenMs
+ ret
+ }.toArray
+
+ // We can only determine the actual partitions at runtime when a dynamic
partition filter is
+ // present. This is because such a filter relies on information that is only
available at run
+ // time (for instance the keys used in the other side of a join).
+ @transient private lazy val dynamicallySelectedPartitions:
Array[PartitionDirectory] = {
+ val dynamicPartitionFilters =
partitionFilters.filter(isDynamicPruningFilter)
+
+ if (dynamicPartitionFilters.nonEmpty) {
+ val startTime = System.nanoTime()
+ // call the file index for the files matching all filters except dynamic
partition filters
+ val predicate = dynamicPartitionFilters.reduce(And)
+ val partitionColumns = relation.partitionSchema
+ val boundPredicate = Predicate.create(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionColumns.indexWhere(a.name == _.name)
+ BoundReference(index, partitionColumns(index).dataType, nullable =
true)
+ }, Nil)
+ val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
+ setFilesNumAndSizeMetric(ret, false)
+ val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
+ driverMetrics("pruningTime") = timeTakenMs
+ ret
+ } else {
+ selectedPartitions
+ }
+ }
+
+ /**
+ * [[partitionFilters]] can contain subqueries whose results are available
only at runtime so
+ * accessing [[selectedPartitions]] should be guarded by this method during
planning
+ */
+ private def hasPartitionsAvailableAtRunTime: Boolean = {
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
+ }
+
+ private def toAttribute(colName: String): Option[Attribute] =
+ output.find(_.name == colName)
+
+ // exposed for testing
+ lazy val bucketedScan: Boolean = {
+ if (relation.sparkSession.sessionState.conf.bucketingEnabled &&
relation.bucketSpec.isDefined
+ && !disableBucketedScan) {
+ val spec = relation.bucketSpec.get
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ bucketColumns.size == spec.bucketColumnNames.size
+ } else {
+ false
+ }
+ }
+
+ override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) = {
+ if (bucketedScan) {
+ // For bucketed columns:
+ // -----------------------
+ // `HashPartitioning` would be used only when:
+ // 1. ALL the bucketing columns are being read from the table
+ //
+ // For sorted columns:
+ // ---------------------
+ // Sort ordering should be used when ALL these criteria's match:
+ // 1. `HashPartitioning` is being used
+ // 2. A prefix (or all) of the sort columns are being read from the
table.
+ //
+ // Sort ordering would be over the prefix subset of `sort columns` being
read
+ // from the table.
+ // e.g.
+ // Assume (col0, col2, col3) are the columns read from the table
+ // If sort columns are (col0, col1), then sort ordering would be
considered as (col0)
+ // If sort columns are (col1, col0), then sort ordering would be empty
as per rule #2
+ // above
+ val spec = relation.bucketSpec.get
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ val numPartitions =
optionalNumCoalescedBuckets.getOrElse(spec.numBuckets)
+ val partitioning = HashPartitioning(bucketColumns, numPartitions)
+ val sortColumns =
+ spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x =>
x.isDefined).map(_.get)
+ val shouldCalculateSortOrder =
+ conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
+ sortColumns.nonEmpty &&
+ !hasPartitionsAvailableAtRunTime
+
+ val sortOrder = if (shouldCalculateSortOrder) {
+ // In case of bucketing, its possible to have multiple files belonging
to the
+ // same bucket in a given relation. Each of these files are locally
sorted
+ // but those files combined together are not globally sorted. Given
that,
+ // the RDD partition will not be sorted even if the relation has sort
columns set
+ // Current solution is to check if all the buckets have a single file
in it
+
+ val files = selectedPartitions.flatMap(partition => partition.files)
+ val bucketToFilesGrouping =
+ files.map(_.getPath.getName).groupBy(file =>
BucketingUtils.getBucketId(file))
+ val singleFilePartitions = bucketToFilesGrouping.forall(p =>
p._2.length <= 1)
+
+ // TODO SPARK-24528 Sort order is currently ignored if buckets are
coalesced.
+ if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
+ // TODO Currently Spark does not support writing columns sorting in
descending order
+ // so using Ascending order. This can be fixed in future
+ sortColumns.map(attribute => SortOrder(attribute, Ascending))
+ } else {
+ Nil
+ }
+ } else {
+ Nil
+ }
+ (partitioning, sortOrder)
+ } else {
+ (UnknownPartitioning(0), Nil)
+ }
+ }
+
+ @transient
+ private lazy val pushedDownFilters = {
+ val supportNestedPredicatePushdown =
DataSourceUtils.supportNestedPredicatePushdown(relation)
+ // `dataFilters` should not include any metadata col filters
+ // because the metadata struct has been flatted in FileSourceStrategy
+ // and thus metadata col filters are invalid to be pushed down
+ dataFilters.filterNot(_.references.exists {
+ case FileSourceMetadataAttribute(_) => true
+ case _ => false
+ }).flatMap(DataSourceStrategy.translateFilter(_,
supportNestedPredicatePushdown))
+ }
+
+ override lazy val metadata: Map[String, String] = {
+ def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+ val location = relation.location
+ val locationDesc =
+ location.getClass.getSimpleName +
+ Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
+ val metadata =
+ Map(
+ "Format" -> relation.fileFormat.toString,
+ "ReadSchema" -> requiredSchema.catalogString,
+ "Batched" -> supportsColumnar.toString,
+ "PartitionFilters" -> seqToString(partitionFilters),
+ "PushedFilters" -> seqToString(pushedDownFilters),
+ "DataFilters" -> seqToString(dataFilters),
+ "Location" -> locationDesc)
+
+ relation.bucketSpec.map { spec =>
+ val bucketedKey = "Bucketed"
+ if (bucketedScan) {
+ val numSelectedBuckets = optionalBucketSet.map { b =>
+ b.cardinality()
+ } getOrElse {
+ spec.numBuckets
+ }
+ metadata ++ Map(
+ bucketedKey -> "true",
+ "SelectedBucketsCount" -> (s"$numSelectedBuckets out of
${spec.numBuckets}" +
+ optionalNumCoalescedBuckets.map { b => s" (Coalesced to
$b)"}.getOrElse("")))
+ } else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) {
+ metadata + (bucketedKey -> "false (disabled by configuration)")
+ } else if (disableBucketedScan) {
+ metadata + (bucketedKey -> "false (disabled by query planner)")
+ } else {
+ metadata + (bucketedKey -> "false (bucket column(s) not read)")
+ }
+ } getOrElse {
+ metadata
+ }
+ }
+
+ override def verboseStringWithOperatorId(): String = {
+ val metadataStr = metadata.toSeq.sorted.filterNot {
+ case (_, value) if (value.isEmpty || value.equals("[]")) => true
+ case (key, _) if (key.equals("DataFilters") || key.equals("Format")) =>
true
+ case (_, _) => false
+ }.map {
+ case (key, _) if (key.equals("Location")) =>
+ val location = relation.location
+ val numPaths = location.rootPaths.length
+ val abbreviatedLocation = if (numPaths <= 1) {
+ location.rootPaths.mkString("[", ", ", "]")
+ } else {
+ "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
+ }
+ s"$key: ${location.getClass.getSimpleName}
${redact(abbreviatedLocation)}"
+ case (key, value) => s"$key: ${redact(value)}"
+ }
+
+ s"""
+ |$formattedNodeName
+ |${ExplainUtils.generateFieldString("Output", output)}
+ |${metadataStr.mkString("\n")}
+ |""".stripMargin
+ }
+
+ lazy val inputRDD: RDD[InternalRow] = {
+ val readFile: (PartitionedFile) => Iterator[InternalRow] =
+ relation.fileFormat.buildReaderWithPartitionValues(
+ sparkSession = relation.sparkSession,
+ dataSchema = relation.dataSchema,
+ partitionSchema = relation.partitionSchema,
+ requiredSchema = requiredSchema,
+ filters = pushedDownFilters,
+ options = relation.options,
+ hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+ val readRDD = if (bucketedScan) {
+ createBucketedReadRDD(relation.bucketSpec.get, readFile,
dynamicallySelectedPartitions,
+ relation)
+ } else {
+ createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+ }
+ sendDriverMetrics()
+ readRDD
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ inputRDD :: Nil
+ }
+
+ /** SQL metrics generated only for scans using dynamic partition pruning. */
+ private lazy val staticMetrics = if
(partitionFilters.exists(isDynamicPruningFilter)) {
+ Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static
number of files read"),
+ "staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static
size of files read"))
+ } else {
+ Map.empty[String, SQLMetric]
+ }
+
+ /** Helper for computing total number and size of files in selected
partitions. */
+ private def setFilesNumAndSizeMetric(
+ partitions: Seq[PartitionDirectory],
+ static: Boolean): Unit = {
+ val filesNum = partitions.map(_.files.size.toLong).sum
+ val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
+ if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
+ driverMetrics("numFiles") = filesNum
+ driverMetrics("filesSize") = filesSize
+ } else {
+ driverMetrics("staticFilesNum") = filesNum
+ driverMetrics("staticFilesSize") = filesSize
+ }
+ if (relation.partitionSchema.nonEmpty) {
+ driverMetrics("numPartitions") = partitions.length
+ }
+ }
+
+ override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files
read"),
+ "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata
time"),
+ "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files
read")
+ ) ++ {
+ // Tracking scan time has overhead, we can't afford to do it for each row,
and can only do
+ // it for each batch.
+ if (supportsColumnar) {
+ Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan
time"))
+ } else {
+ None
+ }
+ } ++ {
+ if (relation.partitionSchema.nonEmpty) {
+ Map(
+ "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions read"),
+ "pruningTime" ->
+ SQLMetrics.createTimingMetric(sparkContext, "dynamic partition
pruning time"))
+ } else {
+ Map.empty[String, SQLMetric]
+ }
+ } ++ staticMetrics
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ if (needsUnsafeRowConversion) {
+ inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+ val toUnsafe = UnsafeProjection.create(schema)
+ toUnsafe.initialize(index)
+ iter.map { row =>
+ numOutputRows += 1
+ toUnsafe(row)
+ }
+ }
+ } else {
+ inputRDD.mapPartitionsInternal { iter =>
+ iter.map { row =>
+ numOutputRows += 1
+ row
+ }
+ }
+ }
+ }
+
+ protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val scanTime = longMetric("scanTime")
+ inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches
=>
+ new Iterator[ColumnarBatch] {
+
+ override def hasNext: Boolean = {
+ // The `FileScanRDD` returns an iterator which scans the file during
the `hasNext` call.
+ val startNs = System.nanoTime()
+ val res = batches.hasNext
+ scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+ res
+ }
+
+ override def next(): ColumnarBatch = {
+ val batch = batches.next()
+ numOutputRows += batch.numRows()
+ batch
+ }
+ }
+ }
+ }
+
+ override val nodeNamePrefix: String = "File"
+
+ /**
+ * Create an RDD for bucketed reads.
+ * The non-bucketed variant of this function is [[createReadRDD]].
+ *
+ * The algorithm is pretty simple: each RDD partition being returned should
include all the files
+ * with the same bucket id from all the given Hive partitions.
+ *
+ * @param bucketSpec the bucketing spec.
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createBucketedReadRDD(
+ bucketSpec: BucketSpec,
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions:
Array[PartitionDirectory],
+ fsRelation: HadoopFsRelation):
RDD[InternalRow] = {
+ logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+ val filesGroupedToBuckets =
+ selectedPartitions.flatMap { p =>
+ p.files.map { f =>
+ PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
+ }
+ }.groupBy { f =>
+ BucketingUtils
+ .getBucketId(new Path(f.filePath).getName)
+ .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
+ }
+
+ val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+ val bucketSet = optionalBucketSet.get
+ filesGroupedToBuckets.filter {
+ f => bucketSet.get(f._1)
+ }
+ } else {
+ filesGroupedToBuckets
+ }
+
+ val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets
=>
+ logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+ val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
numCoalescedBuckets)
+ Seq.tabulate(numCoalescedBuckets) { bucketId =>
+ val partitionedFiles = coalescedBuckets.get(bucketId).map {
+ _.values.flatten.toArray
+ }.getOrElse(Array.empty)
+ FilePartition(bucketId, partitionedFiles)
+ }
+ }.getOrElse {
+ Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+ FilePartition(bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+ }
+ }
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
+ new StructType(requiredSchema.fields ++
fsRelation.partitionSchema.fields), metadataColumns)
+ }
+
+ /**
+ * Create an RDD for non-bucketed reads.
+ * The bucketed variant of this function is [[createBucketedReadRDD]].
+ *
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createReadRDD(
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions: Array[PartitionDirectory],
+ fsRelation: HadoopFsRelation): RDD[InternalRow] =
{
+ val openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ val maxSplitBytes =
+ FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+ logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ // Filter files with bucket pruning if possible
+ val bucketingEnabled =
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+ val shouldProcess: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) if bucketingEnabled =>
+ // Do not prune the file if bucket file name is invalid
+ filePath =>
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+ case _ =>
+ _ => true
+ }
+
+ val splitFiles = selectedPartitions.flatMap { partition =>
+ partition.files.flatMap { file =>
+ // getPath() is very expensive so we only want to call it once in this
block:
+ val filePath = file.getPath
+
+ if (shouldProcess(filePath)) {
+ val isSplitable = relation.fileFormat.isSplitable(
+ relation.sparkSession, relation.options, filePath)
+ PartitionedFileUtil.splitFiles(
+ sparkSession = relation.sparkSession,
+ file = file,
+ filePath = filePath,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partition.values
+ )
+ } else {
+ Seq.empty
+ }
+ }
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+ val partitions =
+ FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
+ new StructType(requiredSchema.fields ++
fsRelation.partitionSchema.fields), metadataColumns)
+ }
+
+ // Filters unused DynamicPruningExpression expressions - one which has been
replaced
+ // with DynamicPruningExpression(Literal.TrueLiteral) during Physical
Planning
+ private def filterUnusedDynamicPruningExpressions(
+ predicates:
Seq[Expression]): Seq[Expression] = {
+ predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+ }
+
+ override def doCanonicalize(): AbstractFileSourceScanExec = {
+ AbstractFileSourceScanExec(
+ relation,
+ output.map(QueryPlan.normalizeExpressions(_, output)),
+ requiredSchema,
+ QueryPlan.normalizePredicates(
+ filterUnusedDynamicPruningExpressions(partitionFilters), output),
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ QueryPlan.normalizePredicates(dataFilters, output),
+ None,
+ disableBucketedScan)
+ }
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..b9a9bb47d
--- /dev/null
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
SinglePartition}
+import org.apache.spark.sql.catalyst.util.InternalRowSet
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
PartitionReaderFactory, Scan, SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+import com.google.common.base.Objects
+
+/** Physical plan node for scanning a batch of data from a data source v2. */
+case class AbstractBatchScanExec(
+ output: Seq[AttributeReference],
+ @transient scan: Scan,
+ runtimeFilters: Seq[Expression],
+ keyGroupedPartitioning: Option[Seq[Expression]] = None)
+ extends DataSourceV2ScanExecBase {
+
+ @transient lazy val batch = scan.toBatch
+
+ // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
+ override def equals(other: Any): Boolean = other match {
+ case other: AbstractBatchScanExec =>
+ this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
+
+ @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
+ val dataSourceFilters = runtimeFilters.flatMap {
+ case DynamicPruningExpression(e) =>
DataSourceStrategy.translateRuntimeFilter(e)
+ case _ => None
+ }
+
+ if (dataSourceFilters.nonEmpty) {
+ val originalPartitioning = outputPartitioning
+
+ // the cast is safe as runtime filters are only assigned if the scan can
be filtered
+ val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
+ filterableScan.filter(dataSourceFilters.toArray)
+
+ // call toBatch again to get filtered partitions
+ val newPartitions = scan.toBatch.planInputPartitions()
+
+ originalPartitioning match {
+ case p: KeyGroupedPartitioning =>
+ if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
+ throw new SparkException(
+ "Data source must have preserved the original partitioning " +
+ "during runtime filtering: not all partitions implement
HasPartitionKey after " +
+ "filtering")
+ }
+
+ val newRows = new InternalRowSet(p.expressions.map(_.dataType))
+ newRows ++=
newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
+ val oldRows = p.partitionValuesOpt.get
+
+ if (oldRows.size != newRows.size) {
+ throw new SparkException(
+ "Data source must have preserved the original partitioning " +
+ "during runtime filtering: the number of unique partition
values obtained " +
+ s"through HasPartitionKey changed: before ${oldRows.size},
after ${newRows.size}")
+ }
+
+ if (!oldRows.forall(newRows.contains)) {
+ throw new SparkException(
+ "Data source must have preserved the original partitioning " +
+ "during runtime filtering: the number of unique partition
values obtained " +
+ s"through HasPartitionKey remain the same but do not exactly
match")
+ }
+
+ groupPartitions(newPartitions).get.map(_._2)
+
+ case _ =>
+ // no validation is needed as the data source did not report any
specific partitioning
+ newPartitions.map(Seq(_))
+ }
+
+ } else {
+ partitions
+ }
+ }
+
+ override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
+
+ override lazy val inputRDD: RDD[InternalRow] = {
+ if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
+ // return an empty RDD with 1 partition if dynamic filtering removed the
only split
+ sparkContext.parallelize(Array.empty[InternalRow], 1)
+ } else {
+ new DataSourceRDD(
+ sparkContext,
+ filteredPartitions,
+ readerFactory,
+ supportsColumnar,
+ customMetrics)
+ }
+ }
+
+ override def doCanonicalize(): AbstractBatchScanExec = {
+ this.copy(
+ output = output.map(QueryPlan.normalizeExpressions(_, output)),
+ runtimeFilters = QueryPlan.normalizePredicates(
+ runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
+ output)
+ )
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
+ val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
+ val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
+ redact(result)
+ }
+}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..ccdd73383
--- /dev/null
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,285 @@
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.TimeUnit._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow,
TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
+import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query
plan, see rule
+ * [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+ @transient override val relation:
HadoopFsRelation,
+ override val output: Seq[Attribute],
+ override val requiredSchema: StructType,
+ override val partitionFilters: Seq[Expression],
+ override val optionalBucketSet: Option[BitSet],
+ override val optionalNumCoalescedBuckets:
Option[Int],
+ override val dataFilters: Seq[Expression],
+ override val tableIdentifier:
Option[TableIdentifier],
+ override val disableBucketedScan: Boolean =
false)
+ extends FileSourceScanLike {
+
+ // Note that some vals referring the file-based relation are lazy
intentionally
+ // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
+ override lazy val supportsColumnar: Boolean = {
+ val conf = relation.sparkSession.sessionState.conf
+ // Only output columnar if there is WSCG to read it.
+ val requiredWholeStageCodegenSettings =
+ conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf,
schema)
+ requiredWholeStageCodegenSettings &&
+ relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ }
+
+ private lazy val needsUnsafeRowConversion: Boolean = {
+ if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+ conf.parquetVectorizedReaderEnabled
+ } else {
+ false
+ }
+ }
+
+ lazy val inputRDD: RDD[InternalRow] = {
+ val options = relation.options +
+ (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
+ val readFile: (PartitionedFile) => Iterator[InternalRow] =
+ relation.fileFormat.buildReaderWithPartitionValues(
+ sparkSession = relation.sparkSession,
+ dataSchema = relation.dataSchema,
+ partitionSchema = relation.partitionSchema,
+ requiredSchema = requiredSchema,
+ filters = pushedDownFilters,
+ options = options,
+ hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+ val readRDD = if (bucketedScan) {
+ createBucketedReadRDD(relation.bucketSpec.get, readFile,
dynamicallySelectedPartitions,
+ relation)
+ } else {
+ createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+ }
+ sendDriverMetrics()
+ readRDD
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ inputRDD :: Nil
+ }
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ if (needsUnsafeRowConversion) {
+ inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+ val toUnsafe = UnsafeProjection.create(schema)
+ toUnsafe.initialize(index)
+ iter.map { row =>
+ numOutputRows += 1
+ toUnsafe(row)
+ }
+ }
+ } else {
+ inputRDD.mapPartitionsInternal { iter =>
+ iter.map { row =>
+ numOutputRows += 1
+ row
+ }
+ }
+ }
+ }
+
+ protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val scanTime = longMetric("scanTime")
+ inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches
=>
+ new Iterator[ColumnarBatch] {
+
+ override def hasNext: Boolean = {
+ // The `FileScanRDD` returns an iterator which scans the file during
the `hasNext` call.
+ val startNs = System.nanoTime()
+ val res = batches.hasNext
+ scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+ res
+ }
+
+ override def next(): ColumnarBatch = {
+ val batch = batches.next()
+ numOutputRows += batch.numRows()
+ batch
+ }
+ }
+ }
+ }
+
+ override val nodeNamePrefix: String = "File"
+
+ /**
+ * Create an RDD for bucketed reads.
+ * The non-bucketed variant of this function is [[createReadRDD]].
+ *
+ * The algorithm is pretty simple: each RDD partition being returned should
include all the files
+ * with the same bucket id from all the given Hive partitions.
+ *
+ * @param bucketSpec the bucketing spec.
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createBucketedReadRDD(
+ bucketSpec: BucketSpec,
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions:
Array[PartitionDirectory],
+ fsRelation: HadoopFsRelation):
RDD[InternalRow] = {
+ logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+ val filesGroupedToBuckets =
+ selectedPartitions.flatMap { p =>
+ p.files.map { f =>
+ PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
+ }
+ }.groupBy { f =>
+ BucketingUtils
+ .getBucketId(f.toPath.getName)
+ .getOrElse(throw
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
+ }
+
+ val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+ val bucketSet = optionalBucketSet.get
+ filesGroupedToBuckets.filter {
+ f => bucketSet.get(f._1)
+ }
+ } else {
+ filesGroupedToBuckets
+ }
+
+ val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets
=>
+ logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+ val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
numCoalescedBuckets)
+ Seq.tabulate(numCoalescedBuckets) { bucketId =>
+ val partitionedFiles = coalescedBuckets.get(bucketId).map {
+ _.values.flatten.toArray
+ }.getOrElse(Array.empty)
+ FilePartition(bucketId, partitionedFiles)
+ }
+ }.getOrElse {
+ Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+ FilePartition(bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+ }
+ }
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
+ new StructType(requiredSchema.fields ++
fsRelation.partitionSchema.fields),
+ fileConstantMetadataColumns, new
FileSourceOptions(CaseInsensitiveMap(relation.options)))
+ }
+
+ /**
+ * Create an RDD for non-bucketed reads.
+ * The bucketed variant of this function is [[createBucketedReadRDD]].
+ *
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ * @param fsRelation [[HadoopFsRelation]] associated with the read.
+ */
+ private def createReadRDD(
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions: Array[PartitionDirectory],
+ fsRelation: HadoopFsRelation): RDD[InternalRow] =
{
+ val openCostInBytes =
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ val maxSplitBytes =
+ FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+ logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ // Filter files with bucket pruning if possible
+ val bucketingEnabled =
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+ val shouldProcess: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) if bucketingEnabled =>
+ // Do not prune the file if bucket file name is invalid
+ filePath =>
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+ case _ =>
+ _ => true
+ }
+
+ val splitFiles = selectedPartitions.flatMap { partition =>
+ partition.files.flatMap { file =>
+ // getPath() is very expensive so we only want to call it once in this
block:
+ val filePath = file.getPath
+
+ if (shouldProcess(filePath)) {
+ val isSplitable = relation.fileFormat.isSplitable(
+ relation.sparkSession, relation.options, filePath) &&
+ // SPARK-39634: Allow file splitting in combination with row index
generation once
+ // the fix for PARQUET-2161 is available.
+ !RowIndexUtil.isNeededForSchema(requiredSchema)
+ PartitionedFileUtil.splitFiles(
+ sparkSession = relation.sparkSession,
+ file = file,
+ filePath = filePath,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partition.values
+ )
+ } else {
+ Seq.empty
+ }
+ }
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+ val partitions =
+ FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
+
+ new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
+ new StructType(requiredSchema.fields ++
fsRelation.partitionSchema.fields),
+ fileConstantMetadataColumns, new
FileSourceOptions(CaseInsensitiveMap(relation.options)))
+ }
+
+ // Filters unused DynamicPruningExpression expressions - one which has been
replaced
+ // with DynamicPruningExpression(Literal.TrueLiteral) during Physical
Planning
+ private def filterUnusedDynamicPruningExpressions(
+ predicates:
Seq[Expression]): Seq[Expression] = {
+ predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+ }
+
+ override def doCanonicalize(): AbstractFileSourceScanExec = {
+ AbstractFileSourceScanExec(
+ relation,
+ output.map(QueryPlan.normalizeExpressions(_, output)),
+ requiredSchema,
+ QueryPlan.normalizePredicates(
+ filterUnusedDynamicPruningExpressions(partitionFilters), output),
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ QueryPlan.normalizePredicates(dataFilters, output),
+ None,
+ disableBucketedScan)
+ }
+}
\ No newline at end of file
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..603b446fe
--- /dev/null
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import com.google.common.base.Objects
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
Partitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.util.{truncatedString,
InternalRowComparableWrapper}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class AbstractBatchScanExec(
+ output: Seq[AttributeReference],
+ @transient scan: Scan,
+ runtimeFilters: Seq[Expression],
+ keyGroupedPartitioning: Option[Seq[Expression]] =
None,
+ ordering: Option[Seq[SortOrder]] = None,
+ @transient table: Table,
+ commonPartitionValues: Option[Seq[(InternalRow,
Int)]] = None,
+ applyPartialClustering: Boolean = false,
+ replicatePartitions: Boolean = false) extends
DataSourceV2ScanExecBase {
+
+ @transient lazy val batch = if (scan == null) null else scan.toBatch
+
+ // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
+ override def equals(other: Any): Boolean = other match {
+ case other: AbstractBatchScanExec =>
+ this.batch != null && this.batch == other.batch &&
+ this.runtimeFilters == other.runtimeFilters &&
+ this.commonPartitionValues == other.commonPartitionValues &&
+ this.replicatePartitions == other.replicatePartitions &&
+ this.applyPartialClustering == other.applyPartialClustering
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
+
+ @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
+ val dataSourceFilters = runtimeFilters.flatMap {
+ case DynamicPruningExpression(e) =>
DataSourceV2Strategy.translateRuntimeFilterV2(e)
+ case _ => None
+ }
+
+ if (dataSourceFilters.nonEmpty) {
+ val originalPartitioning = outputPartitioning
+
+ // the cast is safe as runtime filters are only assigned if the scan can
be filtered
+ val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
+ filterableScan.filter(dataSourceFilters.toArray)
+
+ // call toBatch again to get filtered partitions
+ val newPartitions = scan.toBatch.planInputPartitions()
+
+ originalPartitioning match {
+ case p: KeyGroupedPartitioning =>
+ if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
+ throw new SparkException("Data source must have preserved the
original partitioning " +
+ "during runtime filtering: not all partitions implement
HasPartitionKey after " +
+ "filtering")
+ }
+ val newPartitionValues = newPartitions.map(partition =>
+
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey],
p.expressions))
+ .toSet
+ val oldPartitionValues = p.partitionValues
+ .map(partition => InternalRowComparableWrapper(partition,
p.expressions)).toSet
+ // We require the new number of partition values to be equal or less
than the old number
+ // of partition values here. In the case of less than, empty
partitions will be added for
+ // those missing values that are not present in the new input
partitions.
+ if (oldPartitionValues.size < newPartitionValues.size) {
+ throw new SparkException("During runtime filtering, data source
must either report " +
+ "the same number of partition values, or a subset of partition
values from the " +
+ s"original. Before: ${oldPartitionValues.size} partition values.
" +
+ s"After: ${newPartitionValues.size} partition values")
+ }
+
+ if (!newPartitionValues.forall(oldPartitionValues.contains)) {
+ throw new SparkException("During runtime filtering, data source
must not report new " +
+ "partition values that are not present in the original
partitioning.")
+ }
+
+ groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)
+
+ case _ =>
+ // no validation is needed as the data source did not report any
specific partitioning
+ newPartitions.map(Seq(_))
+ }
+
+ } else {
+ partitions
+ }
+ }
+
+ override def outputPartitioning: Partitioning = {
+ super.outputPartitioning match {
+ case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
+ // We allow duplicated partition values if
+ //
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
+ val newPartValues = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
+ Seq.fill(numSplits)(partValue)
+ }
+ k.copy(numPartitions = newPartValues.length, partitionValues =
newPartValues)
+ case p => p
+ }
+ }
+
+ override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
+
+ override lazy val inputRDD: RDD[InternalRow] = {
+ val rdd = if (filteredPartitions.isEmpty && outputPartitioning ==
SinglePartition) {
+ // return an empty RDD with 1 partition if dynamic filtering removed the
only split
+ sparkContext.parallelize(Array.empty[InternalRow], 1)
+ } else {
+ var finalPartitions = filteredPartitions
+
+ outputPartitioning match {
+ case p: KeyGroupedPartitioning =>
+ if (conf.v2BucketingPushPartValuesEnabled &&
+ conf.v2BucketingPartiallyClusteredDistributionEnabled) {
+ assert(filteredPartitions.forall(_.size == 1),
+ "Expect partitions to be not grouped when " +
+
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
+ "is enabled")
+
+ val groupedPartitions =
groupPartitions(finalPartitions.map(_.head), true)
+ .getOrElse(Seq.empty)
+
+ // This means the input partitions are not grouped by partition
values. We'll need to
+ // check `groupByPartitionValues` and decide whether to group and
replicate splits
+ // within a partition.
+ if (commonPartitionValues.isDefined && applyPartialClustering) {
+ // A mapping from the common partition values to how many splits
the partition
+ // should contain. Note this no longer maintain the partition
key ordering.
+ val commonPartValuesMap = commonPartitionValues
+ .get
+ .map(t => (InternalRowComparableWrapper(t._1, p.expressions),
t._2))
+ .toMap
+ val nestGroupedPartitions = groupedPartitions.map {
+ case (partValue, splits) =>
+ // `commonPartValuesMap` should contain the part value since
it's the super set.
+ val numSplits = commonPartValuesMap
+ .get(InternalRowComparableWrapper(partValue,
p.expressions))
+ assert(numSplits.isDefined, s"Partition value $partValue
does not exist in " +
+ "common partition values from Spark plan")
+
+ val newSplits = if (replicatePartitions) {
+ // We need to also replicate partitions according to the
other side of join
+ Seq.fill(numSplits.get)(splits)
+ } else {
+ // Not grouping by partition values: this could be the
side with partially
+ // clustered distribution. Because of dynamic filtering,
we'll need to check if
+ // the final number of splits of a partition is smaller
than the original
+ // number, and fill with empty splits if so. This is
necessary so that both
+ // sides of a join will have the same number of partitions
& splits.
+ splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
+ }
+ (InternalRowComparableWrapper(partValue, p.expressions),
newSplits)
+ }
+
+ // Now fill missing partition keys with empty partitions
+ val partitionMapping = nestGroupedPartitions.toMap
+ finalPartitions = commonPartitionValues.get.flatMap { case
(partValue, numSplits) =>
+ // Use empty partition for those partition values that are not
present.
+ partitionMapping.getOrElse(
+ InternalRowComparableWrapper(partValue, p.expressions),
+ Seq.fill(numSplits)(Seq.empty))
+ }
+ } else {
+ // either `commonPartitionValues` is not defined, or it is
defined but
+ // `applyPartialClustering` is false.
+ val partitionMapping = groupedPartitions.map { case (row, parts)
=>
+ InternalRowComparableWrapper(row, p.expressions) -> parts
+ }.toMap
+
+ // In case `commonPartitionValues` is not defined (e.g., SPJ is
not used), there
+ // could exist duplicated partition values, as partition
grouping is not done
+ // at the beginning and postponed to this method. It is
important to use unique
+ // partition values here so that grouped partitions won't get
duplicated.
+ finalPartitions = p.uniquePartitionValues.map { partValue =>
+ // Use empty partition for those partition values that are not
present
+ partitionMapping.getOrElse(
+ InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
+ }
+ }
+ } else {
+ val partitionMapping = finalPartitions.map { parts =>
+ val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
+ InternalRowComparableWrapper(row, p.expressions) -> parts
+ }.toMap
+ finalPartitions = p.partitionValues.map { partValue =>
+ // Use empty partition for those partition values that are not
present
+ partitionMapping.getOrElse(
+ InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
+ }
+ }
+
+ case _ =>
+ }
+
+ new DataSourceRDD(
+ sparkContext, finalPartitions, readerFactory, supportsColumnar,
customMetrics)
+ }
+ postDriverMetrics()
+ rdd
+ }
+
+ override def doCanonicalize(): AbstractBatchScanExec = {
+ this.copy(
+ output = output.map(QueryPlan.normalizeExpressions(_, output)),
+ runtimeFilters = QueryPlan.normalizePredicates(
+ runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
+ output))
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
+ val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
+ val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
+ redact(result)
+ }
+
+ override def nodeName: String = {
+ s"BatchScan ${table.name()}".trim
+ }
+}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..2afb2daa3
--- /dev/null
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,273 @@
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.TimeUnit._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow,
TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
+import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query
plan, see rule
+ * [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+ @transient override val relation:
HadoopFsRelation,
+ override val output: Seq[Attribute],
+ override val requiredSchema: StructType,
+ override val partitionFilters: Seq[Expression],
+ override val optionalBucketSet: Option[BitSet],
+ override val optionalNumCoalescedBuckets:
Option[Int],
+ override val dataFilters: Seq[Expression],
+ override val tableIdentifier:
Option[TableIdentifier],
+ override val disableBucketedScan: Boolean =
false)
+ extends FileSourceScanLike {
+
+ // Note that some vals referring the file-based relation are lazy
intentionally
+ // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
+ override lazy val supportsColumnar: Boolean = {
+ val conf = relation.sparkSession.sessionState.conf
+ // Only output columnar if there is WSCG to read it.
+ val requiredWholeStageCodegenSettings =
+ conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf,
schema)
+ requiredWholeStageCodegenSettings &&
+ relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ }
+
+ private lazy val needsUnsafeRowConversion: Boolean = {
+ if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+ conf.parquetVectorizedReaderEnabled
+ } else {
+ false
+ }
+ }
+
+ lazy val inputRDD: RDD[InternalRow] = {
+ val options = relation.options +
+ (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
+ val readFile: (PartitionedFile) => Iterator[InternalRow] =
+ relation.fileFormat.buildReaderWithPartitionValues(
+ sparkSession = relation.sparkSession,
+ dataSchema = relation.dataSchema,
+ partitionSchema = relation.partitionSchema,
+ requiredSchema = requiredSchema,
+ filters = pushedDownFilters,
+ options = options,
+ hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+ val readRDD = if (bucketedScan) {
+ createBucketedReadRDD(relation.bucketSpec.get, readFile,
dynamicallySelectedPartitions)
+ } else {
+ createReadRDD(readFile, dynamicallySelectedPartitions)
+ }
+ sendDriverMetrics()
+ readRDD
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ inputRDD :: Nil
+ }
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ if (needsUnsafeRowConversion) {
+ inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+ val toUnsafe = UnsafeProjection.create(schema)
+ toUnsafe.initialize(index)
+ iter.map { row =>
+ numOutputRows += 1
+ toUnsafe(row)
+ }
+ }
+ } else {
+ inputRDD.mapPartitionsInternal { iter =>
+ iter.map { row =>
+ numOutputRows += 1
+ row
+ }
+ }
+ }
+ }
+
+ protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val scanTime = longMetric("scanTime")
+ inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches
=>
+ new Iterator[ColumnarBatch] {
+
+ override def hasNext: Boolean = {
+ // The `FileScanRDD` returns an iterator which scans the file during
the `hasNext` call.
+ val startNs = System.nanoTime()
+ val res = batches.hasNext
+ scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+ res
+ }
+
+ override def next(): ColumnarBatch = {
+ val batch = batches.next()
+ numOutputRows += batch.numRows()
+ batch
+ }
+ }
+ }
+ }
+
+ override val nodeNamePrefix: String = "File"
+
+ /**
+ * Create an RDD for bucketed reads.
+ * The non-bucketed variant of this function is [[createReadRDD]].
+ *
+ * The algorithm is pretty simple: each RDD partition being returned should
include all the files
+ * with the same bucket id from all the given Hive partitions.
+ *
+ * @param bucketSpec the bucketing spec.
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ */
+ private def createBucketedReadRDD(
+ bucketSpec: BucketSpec,
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions:
Array[PartitionDirectory]): RDD[InternalRow] = {
+ logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+ val filesGroupedToBuckets =
+ selectedPartitions.flatMap { p =>
+ p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))
+ }.groupBy { f =>
+ BucketingUtils
+ .getBucketId(f.toPath.getName)
+ .getOrElse(throw
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
+ }
+
+ val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+ val bucketSet = optionalBucketSet.get
+ filesGroupedToBuckets.filter {
+ f => bucketSet.get(f._1)
+ }
+ } else {
+ filesGroupedToBuckets
+ }
+
+ val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets
=>
+ logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+ val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
numCoalescedBuckets)
+ Seq.tabulate(numCoalescedBuckets) { bucketId =>
+ val partitionedFiles = coalescedBuckets.get(bucketId).map {
+ _.values.flatten.toArray
+ }.getOrElse(Array.empty)
+ FilePartition(bucketId, partitionedFiles)
+ }
+ }.getOrElse {
+ Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+ FilePartition(bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+ }
+ }
+
+ new FileScanRDD(relation.sparkSession, readFile, filePartitions,
+ new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
+ fileConstantMetadataColumns,
relation.fileFormat.fileConstantMetadataExtractors,
+ new FileSourceOptions(CaseInsensitiveMap(relation.options)))
+ }
+
+ /**
+ * Create an RDD for non-bucketed reads.
+ * The bucketed variant of this function is [[createBucketedReadRDD]].
+ *
+ * @param readFile a function to read each (part of a) file.
+ * @param selectedPartitions Hive-style partition that are part of the read.
+ */
+ private def createReadRDD(
+ readFile: (PartitionedFile) =>
Iterator[InternalRow],
+ selectedPartitions: Array[PartitionDirectory]):
RDD[InternalRow] = {
+ val openCostInBytes =
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ val maxSplitBytes =
+ FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
+ logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes,
" +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ // Filter files with bucket pruning if possible
+ val bucketingEnabled =
relation.sparkSession.sessionState.conf.bucketingEnabled
+ val shouldProcess: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) if bucketingEnabled =>
+ // Do not prune the file if bucket file name is invalid
+ filePath =>
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+ case _ =>
+ _ => true
+ }
+
+ val splitFiles = selectedPartitions.flatMap { partition =>
+ partition.files.flatMap { file =>
+ if (shouldProcess(file.getPath)) {
+ val isSplitable = relation.fileFormat.isSplitable(
+ relation.sparkSession, relation.options, file.getPath)
+ PartitionedFileUtil.splitFiles(
+ sparkSession = relation.sparkSession,
+ file = file,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partition.values
+ )
+ } else {
+ Seq.empty
+ }
+ }
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+ val partitions =
+ FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
+
+ new FileScanRDD(relation.sparkSession, readFile, partitions,
+ new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
+ fileConstantMetadataColumns,
relation.fileFormat.fileConstantMetadataExtractors,
+ new FileSourceOptions(CaseInsensitiveMap(relation.options)))
+ }
+
+ // Filters unused DynamicPruningExpression expressions - one which has been
replaced
+ // with DynamicPruningExpression(Literal.TrueLiteral) during Physical
Planning
+ private def filterUnusedDynamicPruningExpressions(
+ predicates:
Seq[Expression]): Seq[Expression] = {
+ predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+ }
+
+ override def doCanonicalize(): AbstractFileSourceScanExec = {
+ AbstractFileSourceScanExec(
+ relation,
+ output.map(QueryPlan.normalizeExpressions(_, output)),
+ requiredSchema,
+ QueryPlan.normalizePredicates(
+ filterUnusedDynamicPruningExpressions(partitionFilters), output),
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ QueryPlan.normalizePredicates(dataFilters, output),
+ None,
+ disableBucketedScan)
+ }
+}
\ No newline at end of file
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..2e25f38b8
--- /dev/null
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import com.google.common.base.Objects
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning,
Partitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.util.{truncatedString,
InternalRowComparableWrapper}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class AbstractBatchScanExec(
+ output: Seq[AttributeReference],
+ @transient scan: Scan,
+ runtimeFilters: Seq[Expression],
+ ordering: Option[Seq[SortOrder]] = None,
+ @transient table: Table,
+ spjParams: StoragePartitionJoinParams =
StoragePartitionJoinParams()
+ ) extends DataSourceV2ScanExecBase {
+
+ @transient lazy val batch: Batch = if (scan == null) null else scan.toBatch
+
+ // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
+ override def equals(other: Any): Boolean = other match {
+ case other: AbstractBatchScanExec =>
+ this.batch != null && this.batch == other.batch &&
+ this.runtimeFilters == other.runtimeFilters &&
+ this.spjParams == other.spjParams
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
+
+ @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
+ val dataSourceFilters = runtimeFilters.flatMap {
+ case DynamicPruningExpression(e) =>
DataSourceV2Strategy.translateRuntimeFilterV2(e)
+ case _ => None
+ }
+
+ if (dataSourceFilters.nonEmpty) {
+ val originalPartitioning = outputPartitioning
+
+ // the cast is safe as runtime filters are only assigned if the scan can
be filtered
+ val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
+ filterableScan.filter(dataSourceFilters.toArray)
+
+ // call toBatch again to get filtered partitions
+ val newPartitions = scan.toBatch.planInputPartitions()
+
+ originalPartitioning match {
+ case p: KeyGroupedPartitioning =>
+ if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
+ throw new SparkException("Data source must have preserved the
original partitioning " +
+ "during runtime filtering: not all partitions implement
HasPartitionKey after " +
+ "filtering")
+ }
+ val newPartitionValues = newPartitions.map(partition =>
+
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey],
p.expressions))
+ .toSet
+ val oldPartitionValues = p.partitionValues
+ .map(partition => InternalRowComparableWrapper(partition,
p.expressions)).toSet
+ // We require the new number of partition values to be equal or less
than the old number
+ // of partition values here. In the case of less than, empty
partitions will be added for
+ // those missing values that are not present in the new input
partitions.
+ if (oldPartitionValues.size < newPartitionValues.size) {
+ throw new SparkException("During runtime filtering, data source
must either report " +
+ "the same number of partition values, or a subset of partition
values from the " +
+ s"original. Before: ${oldPartitionValues.size} partition values.
" +
+ s"After: ${newPartitionValues.size} partition values")
+ }
+
+ if (!newPartitionValues.forall(oldPartitionValues.contains)) {
+ throw new SparkException("During runtime filtering, data source
must not report new " +
+ "partition values that are not present in the original
partitioning.")
+ }
+
+ groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)
+
+ case _ =>
+ // no validation is needed as the data source did not report any
specific partitioning
+ newPartitions.map(Seq(_))
+ }
+
+ } else {
+ partitions
+ }
+ }
+
+ override def outputPartitioning: Partitioning = {
+ super.outputPartitioning match {
+ case k: KeyGroupedPartitioning if
spjParams.commonPartitionValues.isDefined =>
+ // We allow duplicated partition values if
+ //
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
+ val newPartValues = spjParams.commonPartitionValues.get.flatMap {
+ case (partValue, numSplits) => Seq.fill(numSplits)(partValue)
+ }
+ k.copy(numPartitions = newPartValues.length, partitionValues =
newPartValues)
+ case p => p
+ }
+ }
+
+ override lazy val readerFactory: PartitionReaderFactory =
batch.createReaderFactory()
+
+ override lazy val inputRDD: RDD[InternalRow] = {
+ val rdd = if (filteredPartitions.isEmpty && outputPartitioning ==
SinglePartition) {
+ // return an empty RDD with 1 partition if dynamic filtering removed the
only split
+ sparkContext.parallelize(Array.empty[InternalRow], 1)
+ } else {
+ var finalPartitions = filteredPartitions
+
+ outputPartitioning match {
+ case p: KeyGroupedPartitioning =>
+ if (conf.v2BucketingPushPartValuesEnabled &&
+ conf.v2BucketingPartiallyClusteredDistributionEnabled) {
+ assert(filteredPartitions.forall(_.size == 1),
+ "Expect partitions to be not grouped when " +
+
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
+ "is enabled")
+
+ val groupedPartitions =
groupPartitions(finalPartitions.map(_.head),
+ groupSplits = true).getOrElse(Seq.empty)
+
+ // This means the input partitions are not grouped by partition
values. We'll need to
+ // check `groupByPartitionValues` and decide whether to group and
replicate splits
+ // within a partition.
+ if (spjParams.commonPartitionValues.isDefined &&
+ spjParams.applyPartialClustering) {
+ // A mapping from the common partition values to how many splits
the partition
+ // should contain. Note this no longer maintain the partition
key ordering.
+ val commonPartValuesMap = spjParams.commonPartitionValues
+ .get
+ .map(t => (InternalRowComparableWrapper(t._1, p.expressions),
t._2))
+ .toMap
+ val nestGroupedPartitions = groupedPartitions.map {
+ case (partValue, splits) =>
+ // `commonPartValuesMap` should contain the part value since
it's the super set.
+ val numSplits = commonPartValuesMap
+ .get(InternalRowComparableWrapper(partValue,
p.expressions))
+ assert(numSplits.isDefined, s"Partition value $partValue
does not exist in " +
+ "common partition values from Spark plan")
+
+ val newSplits = if (spjParams.replicatePartitions) {
+ // We need to also replicate partitions according to the
other side of join
+ Seq.fill(numSplits.get)(splits)
+ } else {
+ // Not grouping by partition values: this could be the
side with partially
+ // clustered distribution. Because of dynamic filtering,
we'll need to check if
+ // the final number of splits of a partition is smaller
than the original
+ // number, and fill with empty splits if so. This is
necessary so that both
+ // sides of a join will have the same number of partitions
& splits.
+ splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
+ }
+ (InternalRowComparableWrapper(partValue, p.expressions),
newSplits)
+ }
+
+ // Now fill missing partition keys with empty partitions
+ val partitionMapping = nestGroupedPartitions.toMap
+ finalPartitions = spjParams.commonPartitionValues.get.flatMap {
+ case (partValue, numSplits) =>
+ // Use empty partition for those partition values that are
not present.
+ partitionMapping.getOrElse(
+ InternalRowComparableWrapper(partValue, p.expressions),
+ Seq.fill(numSplits)(Seq.empty))
+ }
+ } else {
+ // either `commonPartitionValues` is not defined, or it is
defined but
+ // `applyPartialClustering` is false.
+ val partitionMapping = groupedPartitions.map { case (row, parts)
=>
+ InternalRowComparableWrapper(row, p.expressions) -> parts
+ }.toMap
+
+ // In case `commonPartitionValues` is not defined (e.g., SPJ is
not used), there
+ // could exist duplicated partition values, as partition
grouping is not done
+ // at the beginning and postponed to this method. It is
important to use unique
+ // partition values here so that grouped partitions won't get
duplicated.
+ finalPartitions = p.uniquePartitionValues.map { partValue =>
+ // Use empty partition for those partition values that are not
present
+ partitionMapping.getOrElse(
+ InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
+ }
+ }
+ } else {
+ val partitionMapping = finalPartitions.map { parts =>
+ val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
+ InternalRowComparableWrapper(row, p.expressions) -> parts
+ }.toMap
+ finalPartitions = p.partitionValues.map { partValue =>
+ // Use empty partition for those partition values that are not
present
+ partitionMapping.getOrElse(
+ InternalRowComparableWrapper(partValue, p.expressions),
Seq.empty)
+ }
+ }
+
+ case _ =>
+ }
+
+ new DataSourceRDD(
+ sparkContext, finalPartitions, readerFactory, supportsColumnar,
customMetrics)
+ }
+ postDriverMetrics()
+ rdd
+ }
+
+ override def keyGroupedPartitioning: Option[Seq[Expression]] =
+ spjParams.keyGroupedPartitioning
+
+ override def doCanonicalize(): AbstractBatchScanExec = {
+ this.copy(
+ output = output.map(QueryPlan.normalizeExpressions(_, output)),
+ runtimeFilters = QueryPlan.normalizePredicates(
+ runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
+ output))
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
+ val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
+ val result = s"$nodeName$truncatedOutputString ${scan.description()}
$runtimeFiltersString"
+ redact(result)
+ }
+
+ override def nodeName: String = {
+ s"BatchScan ${table.name()}".trim
+ }
+}
+
+case class StoragePartitionJoinParams(
+ keyGroupedPartitioning:
Option[Seq[Expression]] = None,
+ commonPartitionValues:
Option[Seq[(InternalRow, Int)]] = None,
+ applyPartialClustering: Boolean = false,
+ replicatePartitions: Boolean = false) {
+ override def equals(other: Any): Boolean = other match {
+ case other: StoragePartitionJoinParams =>
+ this.commonPartitionValues == other.commonPartitionValues &&
+ this.replicatePartitions == other.replicatePartitions &&
+ this.applyPartialClustering == other.applyPartialClustering
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(
+ commonPartitionValues: Option[Seq[(InternalRow, Int)]],
+ applyPartialClustering: java.lang.Boolean,
+ replicatePartitions: java.lang.Boolean)
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]