This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5644fc226 [CORE] Prior to #4893, add vanilla Spark's original scan 
source code to keep git history
5644fc226 is described below

commit 5644fc2268e56d2e629177b62808d4c3190abc61
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Mar 13 16:44:48 2024 +0800

    [CORE] Prior to #4893, add vanilla Spark's original scan source code to 
keep git history
---
 .../sql/execution/AbstractFileSourceScanExec.scala | 524 +++++++++++++++++++
 .../datasources/v2/AbstractBatchScanExec.scala     | 111 ++++
 .../sql/execution/AbstractFileSourceScanExec.scala | 569 +++++++++++++++++++++
 .../datasources/v2/AbstractBatchScanExec.scala     | 141 +++++
 .../sql/execution/AbstractFileSourceScanExec.scala | 285 +++++++++++
 .../datasources/v2/AbstractBatchScanExec.scala     | 251 +++++++++
 .../sql/execution/AbstractFileSourceScanExec.scala | 273 ++++++++++
 .../datasources/v2/AbstractBatchScanExec.scala     | 273 ++++++++++
 8 files changed, 2427 insertions(+)

diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..c5d17045b
--- /dev/null
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,524 @@
+package org.apache.spark.sql.execution
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+import java.util.concurrent.TimeUnit._
+import scala.collection.mutable.HashMap
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and 
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding 
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query 
plan, see rule
+ *                            [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+                               @transient relation: HadoopFsRelation,
+                               output: Seq[Attribute],
+                               requiredSchema: StructType,
+                               partitionFilters: Seq[Expression],
+                               optionalBucketSet: Option[BitSet],
+                               optionalNumCoalescedBuckets: Option[Int],
+                               dataFilters: Seq[Expression],
+                               tableIdentifier: Option[TableIdentifier],
+                               disableBucketedScan: Boolean = false)
+  extends DataSourceScanExec {
+
+  // Note that some vals referring the file-based relation are lazy 
intentionally
+  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
+  override lazy val supportsColumnar: Boolean = {
+    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  }
+
+  private lazy val needsUnsafeRowConversion: Boolean = {
+    if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+      conf.parquetVectorizedReaderEnabled
+    } else {
+      false
+    }
+  }
+
+  override def vectorTypes: Option[Seq[String]] =
+    relation.fileFormat.vectorTypes(
+      requiredSchema = requiredSchema,
+      partitionSchema = relation.partitionSchema,
+      relation.sparkSession.sessionState.conf)
+
+  private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
+
+  /**
+   * Send the driver-side metrics. Before calling this function, 
selectedPartitions has
+   * been initialized. See SPARK-26327 for more details.
+   */
+  private def sendDriverMetrics(): Unit = {
+    driverMetrics.foreach(e => metrics(e._1).add(e._2))
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+      metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+  }
+
+  private def isDynamicPruningFilter(e: Expression): Boolean =
+    e.find(_.isInstanceOf[PlanExpression[_]]).isDefined
+
+  @transient lazy val selectedPartitions: Array[PartitionDirectory] = {
+    val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
+    val startTime = System.nanoTime()
+    val ret =
+      relation.location.listFiles(
+        partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)
+    setFilesNumAndSizeMetric(ret, true)
+    val timeTakenMs = NANOSECONDS.toMillis(
+      (System.nanoTime() - startTime) + optimizerMetadataTimeNs)
+    driverMetrics("metadataTime") = timeTakenMs
+    ret
+  }.toArray
+
+  // We can only determine the actual partitions at runtime when a dynamic 
partition filter is
+  // present. This is because such a filter relies on information that is only 
available at run
+  // time (for instance the keys used in the other side of a join).
+  @transient private lazy val dynamicallySelectedPartitions: 
Array[PartitionDirectory] = {
+    val dynamicPartitionFilters = 
partitionFilters.filter(isDynamicPruningFilter)
+
+    if (dynamicPartitionFilters.nonEmpty) {
+      val startTime = System.nanoTime()
+      // call the file index for the files matching all filters except dynamic 
partition filters
+      val predicate = dynamicPartitionFilters.reduce(And)
+      val partitionColumns = relation.partitionSchema
+      val boundPredicate = Predicate.create(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionColumns.indexWhere(a.name == _.name)
+          BoundReference(index, partitionColumns(index).dataType, nullable = 
true)
+      }, Nil)
+      val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
+      setFilesNumAndSizeMetric(ret, false)
+      val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
+      driverMetrics("pruningTime") = timeTakenMs
+      ret
+    } else {
+      selectedPartitions
+    }
+  }
+
+  /**
+   * [[partitionFilters]] can contain subqueries whose results are available 
only at runtime so
+   * accessing [[selectedPartitions]] should be guarded by this method during 
planning
+   */
+  private def hasPartitionsAvailableAtRunTime: Boolean = {
+    partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
+  }
+
+  private def toAttribute(colName: String): Option[Attribute] =
+    output.find(_.name == colName)
+
+  // exposed for testing
+  lazy val bucketedScan: Boolean = {
+    if (relation.sparkSession.sessionState.conf.bucketingEnabled && 
relation.bucketSpec.isDefined
+      && !disableBucketedScan) {
+      val spec = relation.bucketSpec.get
+      val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+      bucketColumns.size == spec.bucketColumnNames.size
+    } else {
+      false
+    }
+  }
+
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
+    if (bucketedScan) {
+      // For bucketed columns:
+      // -----------------------
+      // `HashPartitioning` would be used only when:
+      // 1. ALL the bucketing columns are being read from the table
+      //
+      // For sorted columns:
+      // ---------------------
+      // Sort ordering should be used when ALL these criteria's match:
+      // 1. `HashPartitioning` is being used
+      // 2. A prefix (or all) of the sort columns are being read from the 
table.
+      //
+      // Sort ordering would be over the prefix subset of `sort columns` being 
read
+      // from the table.
+      // e.g.
+      // Assume (col0, col2, col3) are the columns read from the table
+      // If sort columns are (col0, col1), then sort ordering would be 
considered as (col0)
+      // If sort columns are (col1, col0), then sort ordering would be empty 
as per rule #2
+      // above
+      val spec = relation.bucketSpec.get
+      val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+      val numPartitions = 
optionalNumCoalescedBuckets.getOrElse(spec.numBuckets)
+      val partitioning = HashPartitioning(bucketColumns, numPartitions)
+      val sortColumns =
+        spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => 
x.isDefined).map(_.get)
+      val shouldCalculateSortOrder =
+        conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
+          sortColumns.nonEmpty &&
+          !hasPartitionsAvailableAtRunTime
+
+      val sortOrder = if (shouldCalculateSortOrder) {
+        // In case of bucketing, its possible to have multiple files belonging 
to the
+        // same bucket in a given relation. Each of these files are locally 
sorted
+        // but those files combined together are not globally sorted. Given 
that,
+        // the RDD partition will not be sorted even if the relation has sort 
columns set
+        // Current solution is to check if all the buckets have a single file 
in it
+
+        val files = selectedPartitions.flatMap(partition => partition.files)
+        val bucketToFilesGrouping =
+          files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
+        val singleFilePartitions = bucketToFilesGrouping.forall(p => 
p._2.length <= 1)
+
+        // TODO SPARK-24528 Sort order is currently ignored if buckets are 
coalesced.
+        if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
+          // TODO Currently Spark does not support writing columns sorting in 
descending order
+          // so using Ascending order. This can be fixed in future
+          sortColumns.map(attribute => SortOrder(attribute, Ascending))
+        } else {
+          Nil
+        }
+      } else {
+        Nil
+      }
+      (partitioning, sortOrder)
+    } else {
+      (UnknownPartitioning(0), Nil)
+    }
+  }
+
+  @transient
+  private lazy val pushedDownFilters = {
+    val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
+    dataFilters.flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
+  }
+
+  override lazy val metadata: Map[String, String] = {
+    def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+    val location = relation.location
+    val locationDesc =
+      location.getClass.getSimpleName +
+        Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
+    val metadata =
+      Map(
+        "Format" -> relation.fileFormat.toString,
+        "ReadSchema" -> requiredSchema.catalogString,
+        "Batched" -> supportsColumnar.toString,
+        "PartitionFilters" -> seqToString(partitionFilters),
+        "PushedFilters" -> seqToString(pushedDownFilters),
+        "DataFilters" -> seqToString(dataFilters),
+        "Location" -> locationDesc)
+
+    // TODO(SPARK-32986): Add bucketed scan info in explain output of 
AbstractFileSourceScanExec
+    if (bucketedScan) {
+      relation.bucketSpec.map { spec =>
+        val numSelectedBuckets = optionalBucketSet.map { b =>
+          b.cardinality()
+        } getOrElse {
+          spec.numBuckets
+        }
+        metadata + ("SelectedBucketsCount" ->
+          (s"$numSelectedBuckets out of ${spec.numBuckets}" +
+            optionalNumCoalescedBuckets.map { b => s" (Coalesced to 
$b)"}.getOrElse("")))
+      } getOrElse {
+        metadata
+      }
+    } else {
+      metadata
+    }
+  }
+
+  override def verboseStringWithOperatorId(): String = {
+    val metadataStr = metadata.toSeq.sorted.filterNot {
+      case (_, value) if (value.isEmpty || value.equals("[]")) => true
+      case (key, _) if (key.equals("DataFilters") || key.equals("Format")) => 
true
+      case (_, _) => false
+    }.map {
+      case (key, _) if (key.equals("Location")) =>
+        val location = relation.location
+        val numPaths = location.rootPaths.length
+        val abbreviatedLocation = if (numPaths <= 1) {
+          location.rootPaths.mkString("[", ", ", "]")
+        } else {
+          "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
+        }
+        s"$key: ${location.getClass.getSimpleName} 
${redact(abbreviatedLocation)}"
+      case (key, value) => s"$key: ${redact(value)}"
+    }
+
+    s"""
+       |$formattedNodeName
+       |${ExplainUtils.generateFieldString("Output", output)}
+       |${metadataStr.mkString("\n")}
+       |""".stripMargin
+  }
+
+  lazy val inputRDD: RDD[InternalRow] = {
+    val readFile: (PartitionedFile) => Iterator[InternalRow] =
+      relation.fileFormat.buildReaderWithPartitionValues(
+        sparkSession = relation.sparkSession,
+        dataSchema = relation.dataSchema,
+        partitionSchema = relation.partitionSchema,
+        requiredSchema = requiredSchema,
+        filters = pushedDownFilters,
+        options = relation.options,
+        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+    val readRDD = if (bucketedScan) {
+      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions,
+        relation)
+    } else {
+      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+    }
+    sendDriverMetrics()
+    readRDD
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    inputRDD :: Nil
+  }
+
+  /** SQL metrics generated only for scans using dynamic partition pruning. */
+  private lazy val staticMetrics = if 
(partitionFilters.exists(isDynamicPruningFilter)) {
+    Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static 
number of files read"),
+      "staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static 
size of files read"))
+  } else {
+    Map.empty[String, SQLMetric]
+  }
+
+  /** Helper for computing total number and size of files in selected 
partitions. */
+  private def setFilesNumAndSizeMetric(
+                                        partitions: Seq[PartitionDirectory],
+                                        static: Boolean): Unit = {
+    val filesNum = partitions.map(_.files.size.toLong).sum
+    val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
+    if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
+      driverMetrics("numFiles") = filesNum
+      driverMetrics("filesSize") = filesSize
+    } else {
+      driverMetrics("staticFilesNum") = filesNum
+      driverMetrics("staticFilesSize") = filesSize
+    }
+    if (relation.partitionSchemaOption.isDefined) {
+      driverMetrics("numPartitions") = partitions.length
+    }
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files 
read"),
+    "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata 
time"),
+    "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files 
read")
+  ) ++ {
+    // Tracking scan time has overhead, we can't afford to do it for each row, 
and can only do
+    // it for each batch.
+    if (supportsColumnar) {
+      Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan 
time"))
+    } else {
+      None
+    }
+  } ++ {
+    if (relation.partitionSchemaOption.isDefined) {
+      Map(
+        "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
partitions read"),
+        "pruningTime" ->
+          SQLMetrics.createTimingMetric(sparkContext, "dynamic partition 
pruning time"))
+    } else {
+      Map.empty[String, SQLMetric]
+    }
+  } ++ staticMetrics
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    if (needsUnsafeRowConversion) {
+      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+        val toUnsafe = UnsafeProjection.create(schema)
+        toUnsafe.initialize(index)
+        iter.map { row =>
+          numOutputRows += 1
+          toUnsafe(row)
+        }
+      }
+    } else {
+      inputRDD.mapPartitionsInternal { iter =>
+        iter.map { row =>
+          numOutputRows += 1
+          row
+        }
+      }
+    }
+  }
+
+  protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val scanTime = longMetric("scanTime")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches 
=>
+      new Iterator[ColumnarBatch] {
+
+        override def hasNext: Boolean = {
+          // The `FileScanRDD` returns an iterator which scans the file during 
the `hasNext` call.
+          val startNs = System.nanoTime()
+          val res = batches.hasNext
+          scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+          res
+        }
+
+        override def next(): ColumnarBatch = {
+          val batch = batches.next()
+          numOutputRows += batch.numRows()
+          batch
+        }
+      }
+    }
+  }
+
+  override val nodeNamePrefix: String = "File"
+
+  /**
+   * Create an RDD for bucketed reads.
+   * The non-bucketed variant of this function is [[createReadRDD]].
+   *
+   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
+   * with the same bucket id from all the given Hive partitions.
+   *
+   * @param bucketSpec the bucketing spec.
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createBucketedReadRDD(
+                                     bucketSpec: BucketSpec,
+                                     readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                                     selectedPartitions: 
Array[PartitionDirectory],
+                                     fsRelation: HadoopFsRelation): 
RDD[InternalRow] = {
+    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+    val filesGroupedToBuckets =
+      selectedPartitions.flatMap { p =>
+        p.files.map { f =>
+          PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
+        }
+      }.groupBy { f =>
+        BucketingUtils
+          .getBucketId(new Path(f.filePath).getName)
+          .getOrElse(throw new IllegalStateException(s"Invalid bucket file 
${f.filePath}"))
+      }
+
+    val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+      val bucketSet = optionalBucketSet.get
+      filesGroupedToBuckets.filter {
+        f => bucketSet.get(f._1)
+      }
+    } else {
+      filesGroupedToBuckets
+    }
+
+    val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets 
=>
+      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+      val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
+      Seq.tabulate(numCoalescedBuckets) { bucketId =>
+        val partitionedFiles = coalescedBuckets.get(bucketId).map {
+          _.values.flatten.toArray
+        }.getOrElse(Array.empty)
+        FilePartition(bucketId, partitionedFiles)
+      }
+    }.getOrElse {
+      Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+        FilePartition(bucketId, 
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+      }
+    }
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+  }
+
+  /**
+   * Create an RDD for non-bucketed reads.
+   * The bucketed variant of this function is [[createBucketedReadRDD]].
+   *
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createReadRDD(
+                             readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                             selectedPartitions: Array[PartitionDirectory],
+                             fsRelation: HadoopFsRelation): RDD[InternalRow] = 
{
+    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+    val maxSplitBytes =
+      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
+      s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    // Filter files with bucket pruning if possible
+    val bucketingEnabled = 
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+    val shouldProcess: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) if bucketingEnabled =>
+        // Do not prune the file if bucket file name is invalid
+        filePath => 
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+      case _ =>
+        _ => true
+    }
+
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        // getPath() is very expensive so we only want to call it once in this 
block:
+        val filePath = file.getPath
+
+        if (shouldProcess(filePath)) {
+          val isSplitable = relation.fileFormat.isSplitable(
+            relation.sparkSession, relation.options, filePath)
+          PartitionedFileUtil.splitFiles(
+            sparkSession = relation.sparkSession,
+            file = file,
+            filePath = filePath,
+            isSplitable = isSplitable,
+            maxSplitBytes = maxSplitBytes,
+            partitionValues = partition.values
+          )
+        } else {
+          Seq.empty
+        }
+      }
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+    val partitions =
+      FilePartition.getFilePartitions(relation.sparkSession, splitFiles, 
maxSplitBytes)
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+  }
+
+  // Filters unused DynamicPruningExpression expressions - one which has been 
replaced
+  // with DynamicPruningExpression(Literal.TrueLiteral) during Physical 
Planning
+  private def filterUnusedDynamicPruningExpressions(
+                                                     predicates: 
Seq[Expression]): Seq[Expression] = {
+    predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+  }
+
+  override def doCanonicalize(): AbstractFileSourceScanExec = {
+    AbstractFileSourceScanExec(
+      relation,
+      output.map(QueryPlan.normalizeExpressions(_, output)),
+      requiredSchema,
+      QueryPlan.normalizePredicates(
+        filterUnusedDynamicPruningExpressions(partitionFilters), output),
+      optionalBucketSet,
+      optionalNumCoalescedBuckets,
+      QueryPlan.normalizePredicates(dataFilters, output),
+      None,
+      disableBucketedScan)
+  }
+}
\ No newline at end of file
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..42bce39ee
--- /dev/null
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import com.google.common.base.Objects
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan, SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class AbstractBatchScanExec(
+    output: Seq[AttributeReference],
+    @transient scan: Scan,
+    runtimeFilters: Seq[Expression]) extends DataSourceV2ScanExecBase {
+
+  @transient lazy val batch = scan.toBatch
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: AbstractBatchScanExec =>
+      this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
+    case _ =>
+      false
+  }
+
+  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+  @transient override lazy val partitions: Seq[InputPartition] = 
batch.planInputPartitions()
+
+  @transient private lazy val filteredPartitions: Seq[InputPartition] = {
+    val dataSourceFilters = runtimeFilters.flatMap {
+      case DynamicPruningExpression(e) => 
DataSourceStrategy.translateRuntimeFilter(e)
+      case _ => None
+    }
+
+    if (dataSourceFilters.nonEmpty) {
+      val originalPartitioning = outputPartitioning
+
+      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
+      val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
+      filterableScan.filter(dataSourceFilters.toArray)
+
+      // call toBatch again to get filtered partitions
+      val newPartitions = scan.toBatch.planInputPartitions()
+
+      originalPartitioning match {
+        case p: DataSourcePartitioning if p.numPartitions != 
newPartitions.size =>
+          throw new SparkException(
+            "Data source must have preserved the original partitioning during 
runtime filtering; " +
+            s"reported num partitions: ${p.numPartitions}, " +
+            s"num partitions after runtime filtering: ${newPartitions.size}")
+        case _ =>
+          // no validation is needed as the data source did not report any 
specific partitioning
+      }
+
+      newPartitions
+    } else {
+      partitions
+    }
+  }
+
+  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
+
+  override lazy val inputRDD: RDD[InternalRow] = {
+    if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
+      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
+      sparkContext.parallelize(Array.empty[InternalRow], 1)
+    } else {
+      new DataSourceRDD(
+        sparkContext, filteredPartitions, readerFactory, supportsColumnar, 
customMetrics)
+    }
+  }
+
+  override def doCanonicalize(): AbstractBatchScanExec = {
+    this.copy(
+      output = output.map(QueryPlan.normalizeExpressions(_, output)),
+      runtimeFilters = QueryPlan.normalizePredicates(
+        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
+        output))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
+    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
+    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
+    redact(result)
+  }
+}
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..6375177e7
--- /dev/null
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+import java.util.concurrent.TimeUnit._
+
+import scala.collection.mutable.HashMap
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
+import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and 
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding 
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query 
plan, see rule
+ *                            [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+                               @transient relation: HadoopFsRelation,
+                               output: Seq[Attribute],
+                               requiredSchema: StructType,
+                               partitionFilters: Seq[Expression],
+                               optionalBucketSet: Option[BitSet],
+                               optionalNumCoalescedBuckets: Option[Int],
+                               dataFilters: Seq[Expression],
+                               tableIdentifier: Option[TableIdentifier],
+                               disableBucketedScan: Boolean = false)
+  extends DataSourceScanExec {
+
+  lazy val metadataColumns: Seq[AttributeReference] =
+    output.collect { case FileSourceMetadataAttribute(attr) => attr }
+
+  // Note that some vals referring the file-based relation are lazy 
intentionally
+  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
+  override lazy val supportsColumnar: Boolean = {
+    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  }
+
+  private lazy val needsUnsafeRowConversion: Boolean = {
+    if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+      conf.parquetVectorizedReaderEnabled
+    } else {
+      false
+    }
+  }
+
+  override def vectorTypes: Option[Seq[String]] =
+    relation.fileFormat.vectorTypes(
+      requiredSchema = requiredSchema,
+      partitionSchema = relation.partitionSchema,
+      relation.sparkSession.sessionState.conf).map { vectorTypes =>
+      // for column-based file format, append metadata column's vector type 
classes if any
+      vectorTypes ++ 
Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
+    }
+
+  private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
+
+  /**
+   * Send the driver-side metrics. Before calling this function, 
selectedPartitions has
+   * been initialized. See SPARK-26327 for more details.
+   */
+  private def sendDriverMetrics(): Unit = {
+    driverMetrics.foreach(e => metrics(e._1).add(e._2))
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+      metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+  }
+
+  private def isDynamicPruningFilter(e: Expression): Boolean =
+    e.exists(_.isInstanceOf[PlanExpression[_]])
+
+  @transient lazy val selectedPartitions: Array[PartitionDirectory] = {
+    val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
+    val startTime = System.nanoTime()
+    val ret =
+      relation.location.listFiles(
+        partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)
+    setFilesNumAndSizeMetric(ret, true)
+    val timeTakenMs = NANOSECONDS.toMillis(
+      (System.nanoTime() - startTime) + optimizerMetadataTimeNs)
+    driverMetrics("metadataTime") = timeTakenMs
+    ret
+  }.toArray
+
+  // We can only determine the actual partitions at runtime when a dynamic 
partition filter is
+  // present. This is because such a filter relies on information that is only 
available at run
+  // time (for instance the keys used in the other side of a join).
+  @transient private lazy val dynamicallySelectedPartitions: 
Array[PartitionDirectory] = {
+    val dynamicPartitionFilters = 
partitionFilters.filter(isDynamicPruningFilter)
+
+    if (dynamicPartitionFilters.nonEmpty) {
+      val startTime = System.nanoTime()
+      // call the file index for the files matching all filters except dynamic 
partition filters
+      val predicate = dynamicPartitionFilters.reduce(And)
+      val partitionColumns = relation.partitionSchema
+      val boundPredicate = Predicate.create(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionColumns.indexWhere(a.name == _.name)
+          BoundReference(index, partitionColumns(index).dataType, nullable = 
true)
+      }, Nil)
+      val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
+      setFilesNumAndSizeMetric(ret, false)
+      val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
+      driverMetrics("pruningTime") = timeTakenMs
+      ret
+    } else {
+      selectedPartitions
+    }
+  }
+
+  /**
+   * [[partitionFilters]] can contain subqueries whose results are available 
only at runtime so
+   * accessing [[selectedPartitions]] should be guarded by this method during 
planning
+   */
+  private def hasPartitionsAvailableAtRunTime: Boolean = {
+    partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
+  }
+
+  private def toAttribute(colName: String): Option[Attribute] =
+    output.find(_.name == colName)
+
+  // exposed for testing
+  lazy val bucketedScan: Boolean = {
+    if (relation.sparkSession.sessionState.conf.bucketingEnabled && 
relation.bucketSpec.isDefined
+      && !disableBucketedScan) {
+      val spec = relation.bucketSpec.get
+      val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+      bucketColumns.size == spec.bucketColumnNames.size
+    } else {
+      false
+    }
+  }
+
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
+    if (bucketedScan) {
+      // For bucketed columns:
+      // -----------------------
+      // `HashPartitioning` would be used only when:
+      // 1. ALL the bucketing columns are being read from the table
+      //
+      // For sorted columns:
+      // ---------------------
+      // Sort ordering should be used when ALL these criteria's match:
+      // 1. `HashPartitioning` is being used
+      // 2. A prefix (or all) of the sort columns are being read from the 
table.
+      //
+      // Sort ordering would be over the prefix subset of `sort columns` being 
read
+      // from the table.
+      // e.g.
+      // Assume (col0, col2, col3) are the columns read from the table
+      // If sort columns are (col0, col1), then sort ordering would be 
considered as (col0)
+      // If sort columns are (col1, col0), then sort ordering would be empty 
as per rule #2
+      // above
+      val spec = relation.bucketSpec.get
+      val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+      val numPartitions = 
optionalNumCoalescedBuckets.getOrElse(spec.numBuckets)
+      val partitioning = HashPartitioning(bucketColumns, numPartitions)
+      val sortColumns =
+        spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => 
x.isDefined).map(_.get)
+      val shouldCalculateSortOrder =
+        conf.getConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) &&
+          sortColumns.nonEmpty &&
+          !hasPartitionsAvailableAtRunTime
+
+      val sortOrder = if (shouldCalculateSortOrder) {
+        // In case of bucketing, its possible to have multiple files belonging 
to the
+        // same bucket in a given relation. Each of these files are locally 
sorted
+        // but those files combined together are not globally sorted. Given 
that,
+        // the RDD partition will not be sorted even if the relation has sort 
columns set
+        // Current solution is to check if all the buckets have a single file 
in it
+
+        val files = selectedPartitions.flatMap(partition => partition.files)
+        val bucketToFilesGrouping =
+          files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
+        val singleFilePartitions = bucketToFilesGrouping.forall(p => 
p._2.length <= 1)
+
+        // TODO SPARK-24528 Sort order is currently ignored if buckets are 
coalesced.
+        if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
+          // TODO Currently Spark does not support writing columns sorting in 
descending order
+          // so using Ascending order. This can be fixed in future
+          sortColumns.map(attribute => SortOrder(attribute, Ascending))
+        } else {
+          Nil
+        }
+      } else {
+        Nil
+      }
+      (partitioning, sortOrder)
+    } else {
+      (UnknownPartitioning(0), Nil)
+    }
+  }
+
+  @transient
+  private lazy val pushedDownFilters = {
+    val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
+    // `dataFilters` should not include any metadata col filters
+    // because the metadata struct has been flatted in FileSourceStrategy
+    // and thus metadata col filters are invalid to be pushed down
+    dataFilters.filterNot(_.references.exists {
+      case FileSourceMetadataAttribute(_) => true
+      case _ => false
+    }).flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
+  }
+
+  override lazy val metadata: Map[String, String] = {
+    def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+    val location = relation.location
+    val locationDesc =
+      location.getClass.getSimpleName +
+        Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
+    val metadata =
+      Map(
+        "Format" -> relation.fileFormat.toString,
+        "ReadSchema" -> requiredSchema.catalogString,
+        "Batched" -> supportsColumnar.toString,
+        "PartitionFilters" -> seqToString(partitionFilters),
+        "PushedFilters" -> seqToString(pushedDownFilters),
+        "DataFilters" -> seqToString(dataFilters),
+        "Location" -> locationDesc)
+
+    relation.bucketSpec.map { spec =>
+      val bucketedKey = "Bucketed"
+      if (bucketedScan) {
+        val numSelectedBuckets = optionalBucketSet.map { b =>
+          b.cardinality()
+        } getOrElse {
+          spec.numBuckets
+        }
+        metadata ++ Map(
+          bucketedKey -> "true",
+          "SelectedBucketsCount" -> (s"$numSelectedBuckets out of 
${spec.numBuckets}" +
+            optionalNumCoalescedBuckets.map { b => s" (Coalesced to 
$b)"}.getOrElse("")))
+      } else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) {
+        metadata + (bucketedKey -> "false (disabled by configuration)")
+      } else if (disableBucketedScan) {
+        metadata + (bucketedKey -> "false (disabled by query planner)")
+      } else {
+        metadata + (bucketedKey -> "false (bucket column(s) not read)")
+      }
+    } getOrElse {
+      metadata
+    }
+  }
+
+  override def verboseStringWithOperatorId(): String = {
+    val metadataStr = metadata.toSeq.sorted.filterNot {
+      case (_, value) if (value.isEmpty || value.equals("[]")) => true
+      case (key, _) if (key.equals("DataFilters") || key.equals("Format")) => 
true
+      case (_, _) => false
+    }.map {
+      case (key, _) if (key.equals("Location")) =>
+        val location = relation.location
+        val numPaths = location.rootPaths.length
+        val abbreviatedLocation = if (numPaths <= 1) {
+          location.rootPaths.mkString("[", ", ", "]")
+        } else {
+          "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
+        }
+        s"$key: ${location.getClass.getSimpleName} 
${redact(abbreviatedLocation)}"
+      case (key, value) => s"$key: ${redact(value)}"
+    }
+
+    s"""
+       |$formattedNodeName
+       |${ExplainUtils.generateFieldString("Output", output)}
+       |${metadataStr.mkString("\n")}
+       |""".stripMargin
+  }
+
+  lazy val inputRDD: RDD[InternalRow] = {
+    val readFile: (PartitionedFile) => Iterator[InternalRow] =
+      relation.fileFormat.buildReaderWithPartitionValues(
+        sparkSession = relation.sparkSession,
+        dataSchema = relation.dataSchema,
+        partitionSchema = relation.partitionSchema,
+        requiredSchema = requiredSchema,
+        filters = pushedDownFilters,
+        options = relation.options,
+        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+    val readRDD = if (bucketedScan) {
+      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions,
+        relation)
+    } else {
+      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+    }
+    sendDriverMetrics()
+    readRDD
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    inputRDD :: Nil
+  }
+
+  /** SQL metrics generated only for scans using dynamic partition pruning. */
+  private lazy val staticMetrics = if 
(partitionFilters.exists(isDynamicPruningFilter)) {
+    Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static 
number of files read"),
+      "staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static 
size of files read"))
+  } else {
+    Map.empty[String, SQLMetric]
+  }
+
+  /** Helper for computing total number and size of files in selected 
partitions. */
+  private def setFilesNumAndSizeMetric(
+                                        partitions: Seq[PartitionDirectory],
+                                        static: Boolean): Unit = {
+    val filesNum = partitions.map(_.files.size.toLong).sum
+    val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
+    if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
+      driverMetrics("numFiles") = filesNum
+      driverMetrics("filesSize") = filesSize
+    } else {
+      driverMetrics("staticFilesNum") = filesNum
+      driverMetrics("staticFilesSize") = filesSize
+    }
+    if (relation.partitionSchema.nonEmpty) {
+      driverMetrics("numPartitions") = partitions.length
+    }
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files 
read"),
+    "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata 
time"),
+    "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files 
read")
+  ) ++ {
+    // Tracking scan time has overhead, we can't afford to do it for each row, 
and can only do
+    // it for each batch.
+    if (supportsColumnar) {
+      Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan 
time"))
+    } else {
+      None
+    }
+  } ++ {
+    if (relation.partitionSchema.nonEmpty) {
+      Map(
+        "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
partitions read"),
+        "pruningTime" ->
+          SQLMetrics.createTimingMetric(sparkContext, "dynamic partition 
pruning time"))
+    } else {
+      Map.empty[String, SQLMetric]
+    }
+  } ++ staticMetrics
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    if (needsUnsafeRowConversion) {
+      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+        val toUnsafe = UnsafeProjection.create(schema)
+        toUnsafe.initialize(index)
+        iter.map { row =>
+          numOutputRows += 1
+          toUnsafe(row)
+        }
+      }
+    } else {
+      inputRDD.mapPartitionsInternal { iter =>
+        iter.map { row =>
+          numOutputRows += 1
+          row
+        }
+      }
+    }
+  }
+
+  protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val scanTime = longMetric("scanTime")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches 
=>
+      new Iterator[ColumnarBatch] {
+
+        override def hasNext: Boolean = {
+          // The `FileScanRDD` returns an iterator which scans the file during 
the `hasNext` call.
+          val startNs = System.nanoTime()
+          val res = batches.hasNext
+          scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+          res
+        }
+
+        override def next(): ColumnarBatch = {
+          val batch = batches.next()
+          numOutputRows += batch.numRows()
+          batch
+        }
+      }
+    }
+  }
+
+  override val nodeNamePrefix: String = "File"
+
+  /**
+   * Create an RDD for bucketed reads.
+   * The non-bucketed variant of this function is [[createReadRDD]].
+   *
+   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
+   * with the same bucket id from all the given Hive partitions.
+   *
+   * @param bucketSpec the bucketing spec.
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createBucketedReadRDD(
+                                     bucketSpec: BucketSpec,
+                                     readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                                     selectedPartitions: 
Array[PartitionDirectory],
+                                     fsRelation: HadoopFsRelation): 
RDD[InternalRow] = {
+    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+    val filesGroupedToBuckets =
+      selectedPartitions.flatMap { p =>
+        p.files.map { f =>
+          PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
+        }
+      }.groupBy { f =>
+        BucketingUtils
+          .getBucketId(new Path(f.filePath).getName)
+          .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
+      }
+
+    val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+      val bucketSet = optionalBucketSet.get
+      filesGroupedToBuckets.filter {
+        f => bucketSet.get(f._1)
+      }
+    } else {
+      filesGroupedToBuckets
+    }
+
+    val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets 
=>
+      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+      val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
+      Seq.tabulate(numCoalescedBuckets) { bucketId =>
+        val partitionedFiles = coalescedBuckets.get(bucketId).map {
+          _.values.flatten.toArray
+        }.getOrElse(Array.empty)
+        FilePartition(bucketId, partitionedFiles)
+      }
+    }.getOrElse {
+      Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+        FilePartition(bucketId, 
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+      }
+    }
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
+      new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields), metadataColumns)
+  }
+
+  /**
+   * Create an RDD for non-bucketed reads.
+   * The bucketed variant of this function is [[createBucketedReadRDD]].
+   *
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createReadRDD(
+                             readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                             selectedPartitions: Array[PartitionDirectory],
+                             fsRelation: HadoopFsRelation): RDD[InternalRow] = 
{
+    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+    val maxSplitBytes =
+      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
+      s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    // Filter files with bucket pruning if possible
+    val bucketingEnabled = 
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+    val shouldProcess: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) if bucketingEnabled =>
+        // Do not prune the file if bucket file name is invalid
+        filePath => 
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+      case _ =>
+        _ => true
+    }
+
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        // getPath() is very expensive so we only want to call it once in this 
block:
+        val filePath = file.getPath
+
+        if (shouldProcess(filePath)) {
+          val isSplitable = relation.fileFormat.isSplitable(
+            relation.sparkSession, relation.options, filePath)
+          PartitionedFileUtil.splitFiles(
+            sparkSession = relation.sparkSession,
+            file = file,
+            filePath = filePath,
+            isSplitable = isSplitable,
+            maxSplitBytes = maxSplitBytes,
+            partitionValues = partition.values
+          )
+        } else {
+          Seq.empty
+        }
+      }
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+    val partitions =
+      FilePartition.getFilePartitions(relation.sparkSession, splitFiles, 
maxSplitBytes)
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
+      new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields), metadataColumns)
+  }
+
+  // Filters unused DynamicPruningExpression expressions - one which has been 
replaced
+  // with DynamicPruningExpression(Literal.TrueLiteral) during Physical 
Planning
+  private def filterUnusedDynamicPruningExpressions(
+                                                     predicates: 
Seq[Expression]): Seq[Expression] = {
+    predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+  }
+
+  override def doCanonicalize(): AbstractFileSourceScanExec = {
+    AbstractFileSourceScanExec(
+      relation,
+      output.map(QueryPlan.normalizeExpressions(_, output)),
+      requiredSchema,
+      QueryPlan.normalizePredicates(
+        filterUnusedDynamicPruningExpressions(partitionFilters), output),
+      optionalBucketSet,
+      optionalNumCoalescedBuckets,
+      QueryPlan.normalizePredicates(dataFilters, output),
+      None,
+      disableBucketedScan)
+  }
+}
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..b9a9bb47d
--- /dev/null
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
SinglePartition}
+import org.apache.spark.sql.catalyst.util.InternalRowSet
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
PartitionReaderFactory, Scan, SupportsRuntimeFiltering}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+import com.google.common.base.Objects
+
+/** Physical plan node for scanning a batch of data from a data source v2. */
+case class AbstractBatchScanExec(
+    output: Seq[AttributeReference],
+    @transient scan: Scan,
+    runtimeFilters: Seq[Expression],
+    keyGroupedPartitioning: Option[Seq[Expression]] = None)
+  extends DataSourceV2ScanExecBase {
+
+  @transient lazy val batch = scan.toBatch
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: AbstractBatchScanExec =>
+      this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
+    case _ =>
+      false
+  }
+
+  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
+
+  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
+    val dataSourceFilters = runtimeFilters.flatMap {
+      case DynamicPruningExpression(e) => 
DataSourceStrategy.translateRuntimeFilter(e)
+      case _ => None
+    }
+
+    if (dataSourceFilters.nonEmpty) {
+      val originalPartitioning = outputPartitioning
+
+      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
+      val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
+      filterableScan.filter(dataSourceFilters.toArray)
+
+      // call toBatch again to get filtered partitions
+      val newPartitions = scan.toBatch.planInputPartitions()
+
+      originalPartitioning match {
+        case p: KeyGroupedPartitioning =>
+          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
+            throw new SparkException(
+              "Data source must have preserved the original partitioning " +
+                "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
+                "filtering")
+          }
+
+          val newRows = new InternalRowSet(p.expressions.map(_.dataType))
+          newRows ++= 
newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
+          val oldRows = p.partitionValuesOpt.get
+
+          if (oldRows.size != newRows.size) {
+            throw new SparkException(
+              "Data source must have preserved the original partitioning " +
+                "during runtime filtering: the number of unique partition 
values obtained " +
+                s"through HasPartitionKey changed: before ${oldRows.size}, 
after ${newRows.size}")
+          }
+
+          if (!oldRows.forall(newRows.contains)) {
+            throw new SparkException(
+              "Data source must have preserved the original partitioning " +
+                "during runtime filtering: the number of unique partition 
values obtained " +
+                s"through HasPartitionKey remain the same but do not exactly 
match")
+          }
+
+          groupPartitions(newPartitions).get.map(_._2)
+
+        case _ =>
+          // no validation is needed as the data source did not report any 
specific partitioning
+          newPartitions.map(Seq(_))
+      }
+
+    } else {
+      partitions
+    }
+  }
+
+  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
+
+  override lazy val inputRDD: RDD[InternalRow] = {
+    if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
+      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
+      sparkContext.parallelize(Array.empty[InternalRow], 1)
+    } else {
+      new DataSourceRDD(
+        sparkContext,
+        filteredPartitions,
+        readerFactory,
+        supportsColumnar,
+        customMetrics)
+    }
+  }
+
+  override def doCanonicalize(): AbstractBatchScanExec = {
+    this.copy(
+      output = output.map(QueryPlan.normalizeExpressions(_, output)),
+      runtimeFilters = QueryPlan.normalizePredicates(
+        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
+        output)
+    )
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
+    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
+    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
+    redact(result)
+  }
+}
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..ccdd73383
--- /dev/null
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,285 @@
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.TimeUnit._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, 
TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
+import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and 
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding 
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query 
plan, see rule
+ *                            [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+                               @transient override val relation: 
HadoopFsRelation,
+                               override val output: Seq[Attribute],
+                               override val requiredSchema: StructType,
+                               override val partitionFilters: Seq[Expression],
+                               override val optionalBucketSet: Option[BitSet],
+                               override val optionalNumCoalescedBuckets: 
Option[Int],
+                               override val dataFilters: Seq[Expression],
+                               override val tableIdentifier: 
Option[TableIdentifier],
+                               override val disableBucketedScan: Boolean = 
false)
+  extends FileSourceScanLike {
+
+  // Note that some vals referring the file-based relation are lazy 
intentionally
+  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
+  override lazy val supportsColumnar: Boolean = {
+    val conf = relation.sparkSession.sessionState.conf
+    // Only output columnar if there is WSCG to read it.
+    val requiredWholeStageCodegenSettings =
+      conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, 
schema)
+    requiredWholeStageCodegenSettings &&
+      relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  }
+
+  private lazy val needsUnsafeRowConversion: Boolean = {
+    if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+      conf.parquetVectorizedReaderEnabled
+    } else {
+      false
+    }
+  }
+
+  lazy val inputRDD: RDD[InternalRow] = {
+    val options = relation.options +
+      (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
+    val readFile: (PartitionedFile) => Iterator[InternalRow] =
+      relation.fileFormat.buildReaderWithPartitionValues(
+        sparkSession = relation.sparkSession,
+        dataSchema = relation.dataSchema,
+        partitionSchema = relation.partitionSchema,
+        requiredSchema = requiredSchema,
+        filters = pushedDownFilters,
+        options = options,
+        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+    val readRDD = if (bucketedScan) {
+      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions,
+        relation)
+    } else {
+      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+    }
+    sendDriverMetrics()
+    readRDD
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    inputRDD :: Nil
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    if (needsUnsafeRowConversion) {
+      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+        val toUnsafe = UnsafeProjection.create(schema)
+        toUnsafe.initialize(index)
+        iter.map { row =>
+          numOutputRows += 1
+          toUnsafe(row)
+        }
+      }
+    } else {
+      inputRDD.mapPartitionsInternal { iter =>
+        iter.map { row =>
+          numOutputRows += 1
+          row
+        }
+      }
+    }
+  }
+
+  protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val scanTime = longMetric("scanTime")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches 
=>
+      new Iterator[ColumnarBatch] {
+
+        override def hasNext: Boolean = {
+          // The `FileScanRDD` returns an iterator which scans the file during 
the `hasNext` call.
+          val startNs = System.nanoTime()
+          val res = batches.hasNext
+          scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+          res
+        }
+
+        override def next(): ColumnarBatch = {
+          val batch = batches.next()
+          numOutputRows += batch.numRows()
+          batch
+        }
+      }
+    }
+  }
+
+  override val nodeNamePrefix: String = "File"
+
+  /**
+   * Create an RDD for bucketed reads.
+   * The non-bucketed variant of this function is [[createReadRDD]].
+   *
+   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
+   * with the same bucket id from all the given Hive partitions.
+   *
+   * @param bucketSpec the bucketing spec.
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createBucketedReadRDD(
+                                     bucketSpec: BucketSpec,
+                                     readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                                     selectedPartitions: 
Array[PartitionDirectory],
+                                     fsRelation: HadoopFsRelation): 
RDD[InternalRow] = {
+    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+    val filesGroupedToBuckets =
+      selectedPartitions.flatMap { p =>
+        p.files.map { f =>
+          PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
+        }
+      }.groupBy { f =>
+        BucketingUtils
+          .getBucketId(f.toPath.getName)
+          .getOrElse(throw 
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
+      }
+
+    val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+      val bucketSet = optionalBucketSet.get
+      filesGroupedToBuckets.filter {
+        f => bucketSet.get(f._1)
+      }
+    } else {
+      filesGroupedToBuckets
+    }
+
+    val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets 
=>
+      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+      val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
+      Seq.tabulate(numCoalescedBuckets) { bucketId =>
+        val partitionedFiles = coalescedBuckets.get(bucketId).map {
+          _.values.flatten.toArray
+        }.getOrElse(Array.empty)
+        FilePartition(bucketId, partitionedFiles)
+      }
+    }.getOrElse {
+      Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+        FilePartition(bucketId, 
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+      }
+    }
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
+      new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields),
+      fileConstantMetadataColumns, new 
FileSourceOptions(CaseInsensitiveMap(relation.options)))
+  }
+
+  /**
+   * Create an RDD for non-bucketed reads.
+   * The bucketed variant of this function is [[createBucketedReadRDD]].
+   *
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createReadRDD(
+                             readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                             selectedPartitions: Array[PartitionDirectory],
+                             fsRelation: HadoopFsRelation): RDD[InternalRow] = 
{
+    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+    val maxSplitBytes =
+      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
+      s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    // Filter files with bucket pruning if possible
+    val bucketingEnabled = 
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+    val shouldProcess: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) if bucketingEnabled =>
+        // Do not prune the file if bucket file name is invalid
+        filePath => 
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+      case _ =>
+        _ => true
+    }
+
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        // getPath() is very expensive so we only want to call it once in this 
block:
+        val filePath = file.getPath
+
+        if (shouldProcess(filePath)) {
+          val isSplitable = relation.fileFormat.isSplitable(
+            relation.sparkSession, relation.options, filePath) &&
+            // SPARK-39634: Allow file splitting in combination with row index 
generation once
+            // the fix for PARQUET-2161 is available.
+            !RowIndexUtil.isNeededForSchema(requiredSchema)
+          PartitionedFileUtil.splitFiles(
+            sparkSession = relation.sparkSession,
+            file = file,
+            filePath = filePath,
+            isSplitable = isSplitable,
+            maxSplitBytes = maxSplitBytes,
+            partitionValues = partition.values
+          )
+        } else {
+          Seq.empty
+        }
+      }
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+    val partitions =
+      FilePartition.getFilePartitions(relation.sparkSession, splitFiles, 
maxSplitBytes)
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
+      new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields),
+      fileConstantMetadataColumns, new 
FileSourceOptions(CaseInsensitiveMap(relation.options)))
+  }
+
+  // Filters unused DynamicPruningExpression expressions - one which has been 
replaced
+  // with DynamicPruningExpression(Literal.TrueLiteral) during Physical 
Planning
+  private def filterUnusedDynamicPruningExpressions(
+                                                     predicates: 
Seq[Expression]): Seq[Expression] = {
+    predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+  }
+
+  override def doCanonicalize(): AbstractFileSourceScanExec = {
+    AbstractFileSourceScanExec(
+      relation,
+      output.map(QueryPlan.normalizeExpressions(_, output)),
+      requiredSchema,
+      QueryPlan.normalizePredicates(
+        filterUnusedDynamicPruningExpressions(partitionFilters), output),
+      optionalBucketSet,
+      optionalNumCoalescedBuckets,
+      QueryPlan.normalizePredicates(dataFilters, output),
+      None,
+      disableBucketedScan)
+  }
+}
\ No newline at end of file
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..603b446fe
--- /dev/null
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import com.google.common.base.Objects
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
Partitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class AbstractBatchScanExec(
+                          output: Seq[AttributeReference],
+                          @transient scan: Scan,
+                          runtimeFilters: Seq[Expression],
+                          keyGroupedPartitioning: Option[Seq[Expression]] = 
None,
+                          ordering: Option[Seq[SortOrder]] = None,
+                          @transient table: Table,
+                          commonPartitionValues: Option[Seq[(InternalRow, 
Int)]] = None,
+                          applyPartialClustering: Boolean = false,
+                          replicatePartitions: Boolean = false) extends 
DataSourceV2ScanExecBase {
+
+  @transient lazy val batch = if (scan == null) null else scan.toBatch
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: AbstractBatchScanExec =>
+      this.batch != null && this.batch == other.batch &&
+        this.runtimeFilters == other.runtimeFilters &&
+        this.commonPartitionValues == other.commonPartitionValues &&
+        this.replicatePartitions == other.replicatePartitions &&
+        this.applyPartialClustering == other.applyPartialClustering
+    case _ =>
+      false
+  }
+
+  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
+
+  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
+    val dataSourceFilters = runtimeFilters.flatMap {
+      case DynamicPruningExpression(e) => 
DataSourceV2Strategy.translateRuntimeFilterV2(e)
+      case _ => None
+    }
+
+    if (dataSourceFilters.nonEmpty) {
+      val originalPartitioning = outputPartitioning
+
+      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
+      val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
+      filterableScan.filter(dataSourceFilters.toArray)
+
+      // call toBatch again to get filtered partitions
+      val newPartitions = scan.toBatch.planInputPartitions()
+
+      originalPartitioning match {
+        case p: KeyGroupedPartitioning =>
+          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
+            throw new SparkException("Data source must have preserved the 
original partitioning " +
+              "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
+              "filtering")
+          }
+          val newPartitionValues = newPartitions.map(partition =>
+            
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey], 
p.expressions))
+            .toSet
+          val oldPartitionValues = p.partitionValues
+            .map(partition => InternalRowComparableWrapper(partition, 
p.expressions)).toSet
+          // We require the new number of partition values to be equal or less 
than the old number
+          // of partition values here. In the case of less than, empty 
partitions will be added for
+          // those missing values that are not present in the new input 
partitions.
+          if (oldPartitionValues.size < newPartitionValues.size) {
+            throw new SparkException("During runtime filtering, data source 
must either report " +
+              "the same number of partition values, or a subset of partition 
values from the " +
+              s"original. Before: ${oldPartitionValues.size} partition values. 
" +
+              s"After: ${newPartitionValues.size} partition values")
+          }
+
+          if (!newPartitionValues.forall(oldPartitionValues.contains)) {
+            throw new SparkException("During runtime filtering, data source 
must not report new " +
+              "partition values that are not present in the original 
partitioning.")
+          }
+
+          groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)
+
+        case _ =>
+          // no validation is needed as the data source did not report any 
specific partitioning
+          newPartitions.map(Seq(_))
+      }
+
+    } else {
+      partitions
+    }
+  }
+
+  override def outputPartitioning: Partitioning = {
+    super.outputPartitioning match {
+      case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
+        // We allow duplicated partition values if
+        // 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
+        val newPartValues = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
+          Seq.fill(numSplits)(partValue)
+        }
+        k.copy(numPartitions = newPartValues.length, partitionValues = 
newPartValues)
+      case p => p
+    }
+  }
+
+  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
+
+  override lazy val inputRDD: RDD[InternalRow] = {
+    val rdd = if (filteredPartitions.isEmpty && outputPartitioning == 
SinglePartition) {
+      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
+      sparkContext.parallelize(Array.empty[InternalRow], 1)
+    } else {
+      var finalPartitions = filteredPartitions
+
+      outputPartitioning match {
+        case p: KeyGroupedPartitioning =>
+          if (conf.v2BucketingPushPartValuesEnabled &&
+            conf.v2BucketingPartiallyClusteredDistributionEnabled) {
+            assert(filteredPartitions.forall(_.size == 1),
+              "Expect partitions to be not grouped when " +
+                
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
+                "is enabled")
+
+            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head), true)
+              .getOrElse(Seq.empty)
+
+            // This means the input partitions are not grouped by partition 
values. We'll need to
+            // check `groupByPartitionValues` and decide whether to group and 
replicate splits
+            // within a partition.
+            if (commonPartitionValues.isDefined && applyPartialClustering) {
+              // A mapping from the common partition values to how many splits 
the partition
+              // should contain. Note this no longer maintain the partition 
key ordering.
+              val commonPartValuesMap = commonPartitionValues
+                .get
+                .map(t => (InternalRowComparableWrapper(t._1, p.expressions), 
t._2))
+                .toMap
+              val nestGroupedPartitions = groupedPartitions.map {
+                case (partValue, splits) =>
+                  // `commonPartValuesMap` should contain the part value since 
it's the super set.
+                  val numSplits = commonPartValuesMap
+                    .get(InternalRowComparableWrapper(partValue, 
p.expressions))
+                  assert(numSplits.isDefined, s"Partition value $partValue 
does not exist in " +
+                    "common partition values from Spark plan")
+
+                  val newSplits = if (replicatePartitions) {
+                    // We need to also replicate partitions according to the 
other side of join
+                    Seq.fill(numSplits.get)(splits)
+                  } else {
+                    // Not grouping by partition values: this could be the 
side with partially
+                    // clustered distribution. Because of dynamic filtering, 
we'll need to check if
+                    // the final number of splits of a partition is smaller 
than the original
+                    // number, and fill with empty splits if so. This is 
necessary so that both
+                    // sides of a join will have the same number of partitions 
& splits.
+                    splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
+                  }
+                  (InternalRowComparableWrapper(partValue, p.expressions), 
newSplits)
+              }
+
+              // Now fill missing partition keys with empty partitions
+              val partitionMapping = nestGroupedPartitions.toMap
+              finalPartitions = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
+                // Use empty partition for those partition values that are not 
present.
+                partitionMapping.getOrElse(
+                  InternalRowComparableWrapper(partValue, p.expressions),
+                  Seq.fill(numSplits)(Seq.empty))
+              }
+            } else {
+              // either `commonPartitionValues` is not defined, or it is 
defined but
+              // `applyPartialClustering` is false.
+              val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
+                InternalRowComparableWrapper(row, p.expressions) -> parts
+              }.toMap
+
+              // In case `commonPartitionValues` is not defined (e.g., SPJ is 
not used), there
+              // could exist duplicated partition values, as partition 
grouping is not done
+              // at the beginning and postponed to this method. It is 
important to use unique
+              // partition values here so that grouped partitions won't get 
duplicated.
+              finalPartitions = p.uniquePartitionValues.map { partValue =>
+                // Use empty partition for those partition values that are not 
present
+                partitionMapping.getOrElse(
+                  InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
+              }
+            }
+          } else {
+            val partitionMapping = finalPartitions.map { parts =>
+              val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
+              InternalRowComparableWrapper(row, p.expressions) -> parts
+            }.toMap
+            finalPartitions = p.partitionValues.map { partValue =>
+              // Use empty partition for those partition values that are not 
present
+              partitionMapping.getOrElse(
+                InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
+            }
+          }
+
+        case _ =>
+      }
+
+      new DataSourceRDD(
+        sparkContext, finalPartitions, readerFactory, supportsColumnar, 
customMetrics)
+    }
+    postDriverMetrics()
+    rdd
+  }
+
+  override def doCanonicalize(): AbstractBatchScanExec = {
+    this.copy(
+      output = output.map(QueryPlan.normalizeExpressions(_, output)),
+      runtimeFilters = QueryPlan.normalizePredicates(
+        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
+        output))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
+    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
+    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
+    redact(result)
+  }
+
+  override def nodeName: String = {
+    s"BatchScan ${table.name()}".trim
+  }
+}
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
new file mode 100644
index 000000000..2afb2daa3
--- /dev/null
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -0,0 +1,273 @@
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.TimeUnit._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, 
TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
+import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and 
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding 
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query 
plan, see rule
+ *                            [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class AbstractFileSourceScanExec(
+                               @transient override val relation: 
HadoopFsRelation,
+                               override val output: Seq[Attribute],
+                               override val requiredSchema: StructType,
+                               override val partitionFilters: Seq[Expression],
+                               override val optionalBucketSet: Option[BitSet],
+                               override val optionalNumCoalescedBuckets: 
Option[Int],
+                               override val dataFilters: Seq[Expression],
+                               override val tableIdentifier: 
Option[TableIdentifier],
+                               override val disableBucketedScan: Boolean = 
false)
+  extends FileSourceScanLike {
+
+  // Note that some vals referring the file-based relation are lazy 
intentionally
+  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
+  override lazy val supportsColumnar: Boolean = {
+    val conf = relation.sparkSession.sessionState.conf
+    // Only output columnar if there is WSCG to read it.
+    val requiredWholeStageCodegenSettings =
+      conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, 
schema)
+    requiredWholeStageCodegenSettings &&
+      relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  }
+
+  private lazy val needsUnsafeRowConversion: Boolean = {
+    if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+      conf.parquetVectorizedReaderEnabled
+    } else {
+      false
+    }
+  }
+
+  lazy val inputRDD: RDD[InternalRow] = {
+    val options = relation.options +
+      (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
+    val readFile: (PartitionedFile) => Iterator[InternalRow] =
+      relation.fileFormat.buildReaderWithPartitionValues(
+        sparkSession = relation.sparkSession,
+        dataSchema = relation.dataSchema,
+        partitionSchema = relation.partitionSchema,
+        requiredSchema = requiredSchema,
+        filters = pushedDownFilters,
+        options = options,
+        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+    val readRDD = if (bucketedScan) {
+      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions)
+    } else {
+      createReadRDD(readFile, dynamicallySelectedPartitions)
+    }
+    sendDriverMetrics()
+    readRDD
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    inputRDD :: Nil
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    if (needsUnsafeRowConversion) {
+      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+        val toUnsafe = UnsafeProjection.create(schema)
+        toUnsafe.initialize(index)
+        iter.map { row =>
+          numOutputRows += 1
+          toUnsafe(row)
+        }
+      }
+    } else {
+      inputRDD.mapPartitionsInternal { iter =>
+        iter.map { row =>
+          numOutputRows += 1
+          row
+        }
+      }
+    }
+  }
+
+  protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val scanTime = longMetric("scanTime")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches 
=>
+      new Iterator[ColumnarBatch] {
+
+        override def hasNext: Boolean = {
+          // The `FileScanRDD` returns an iterator which scans the file during 
the `hasNext` call.
+          val startNs = System.nanoTime()
+          val res = batches.hasNext
+          scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
+          res
+        }
+
+        override def next(): ColumnarBatch = {
+          val batch = batches.next()
+          numOutputRows += batch.numRows()
+          batch
+        }
+      }
+    }
+  }
+
+  override val nodeNamePrefix: String = "File"
+
+  /**
+   * Create an RDD for bucketed reads.
+   * The non-bucketed variant of this function is [[createReadRDD]].
+   *
+   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
+   * with the same bucket id from all the given Hive partitions.
+   *
+   * @param bucketSpec the bucketing spec.
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   */
+  private def createBucketedReadRDD(
+                                     bucketSpec: BucketSpec,
+                                     readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                                     selectedPartitions: 
Array[PartitionDirectory]): RDD[InternalRow] = {
+    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+    val filesGroupedToBuckets =
+      selectedPartitions.flatMap { p =>
+        p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))
+      }.groupBy { f =>
+        BucketingUtils
+          .getBucketId(f.toPath.getName)
+          .getOrElse(throw 
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
+      }
+
+    val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+      val bucketSet = optionalBucketSet.get
+      filesGroupedToBuckets.filter {
+        f => bucketSet.get(f._1)
+      }
+    } else {
+      filesGroupedToBuckets
+    }
+
+    val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets 
=>
+      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
+      val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
+      Seq.tabulate(numCoalescedBuckets) { bucketId =>
+        val partitionedFiles = coalescedBuckets.get(bucketId).map {
+          _.values.flatten.toArray
+        }.getOrElse(Array.empty)
+        FilePartition(bucketId, partitionedFiles)
+      }
+    }.getOrElse {
+      Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+        FilePartition(bucketId, 
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+      }
+    }
+
+    new FileScanRDD(relation.sparkSession, readFile, filePartitions,
+      new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
+      fileConstantMetadataColumns, 
relation.fileFormat.fileConstantMetadataExtractors,
+      new FileSourceOptions(CaseInsensitiveMap(relation.options)))
+  }
+
+  /**
+   * Create an RDD for non-bucketed reads.
+   * The bucketed variant of this function is [[createBucketedReadRDD]].
+   *
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   */
+  private def createReadRDD(
+                             readFile: (PartitionedFile) => 
Iterator[InternalRow],
+                             selectedPartitions: Array[PartitionDirectory]): 
RDD[InternalRow] = {
+    val openCostInBytes = 
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
+    val maxSplitBytes =
+      FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
+    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
+      s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    // Filter files with bucket pruning if possible
+    val bucketingEnabled = 
relation.sparkSession.sessionState.conf.bucketingEnabled
+    val shouldProcess: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) if bucketingEnabled =>
+        // Do not prune the file if bucket file name is invalid
+        filePath => 
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+      case _ =>
+        _ => true
+    }
+
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        if (shouldProcess(file.getPath)) {
+          val isSplitable = relation.fileFormat.isSplitable(
+            relation.sparkSession, relation.options, file.getPath)
+          PartitionedFileUtil.splitFiles(
+            sparkSession = relation.sparkSession,
+            file = file,
+            isSplitable = isSplitable,
+            maxSplitBytes = maxSplitBytes,
+            partitionValues = partition.values
+          )
+        } else {
+          Seq.empty
+        }
+      }
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+    val partitions =
+      FilePartition.getFilePartitions(relation.sparkSession, splitFiles, 
maxSplitBytes)
+
+    new FileScanRDD(relation.sparkSession, readFile, partitions,
+      new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
+      fileConstantMetadataColumns, 
relation.fileFormat.fileConstantMetadataExtractors,
+      new FileSourceOptions(CaseInsensitiveMap(relation.options)))
+  }
+
+  // Filters unused DynamicPruningExpression expressions - one which has been 
replaced
+  // with DynamicPruningExpression(Literal.TrueLiteral) during Physical 
Planning
+  private def filterUnusedDynamicPruningExpressions(
+                                                     predicates: 
Seq[Expression]): Seq[Expression] = {
+    predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+  }
+
+  override def doCanonicalize(): AbstractFileSourceScanExec = {
+    AbstractFileSourceScanExec(
+      relation,
+      output.map(QueryPlan.normalizeExpressions(_, output)),
+      requiredSchema,
+      QueryPlan.normalizePredicates(
+        filterUnusedDynamicPruningExpressions(partitionFilters), output),
+      optionalBucketSet,
+      optionalNumCoalescedBuckets,
+      QueryPlan.normalizePredicates(dataFilters, output),
+      None,
+      disableBucketedScan)
+  }
+}
\ No newline at end of file
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
new file mode 100644
index 000000000..2e25f38b8
--- /dev/null
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import com.google.common.base.Objects
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
Partitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Physical plan node for scanning a batch of data from a data source v2.
+ */
+case class AbstractBatchScanExec(
+                          output: Seq[AttributeReference],
+                          @transient scan: Scan,
+                          runtimeFilters: Seq[Expression],
+                          ordering: Option[Seq[SortOrder]] = None,
+                          @transient table: Table,
+                          spjParams: StoragePartitionJoinParams = 
StoragePartitionJoinParams()
+                        ) extends DataSourceV2ScanExecBase {
+
+  @transient lazy val batch: Batch = if (scan == null) null else scan.toBatch
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: AbstractBatchScanExec =>
+      this.batch != null && this.batch == other.batch &&
+        this.runtimeFilters == other.runtimeFilters &&
+        this.spjParams == other.spjParams
+    case _ =>
+      false
+  }
+
+  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
+
+  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
+
+  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
+    val dataSourceFilters = runtimeFilters.flatMap {
+      case DynamicPruningExpression(e) => 
DataSourceV2Strategy.translateRuntimeFilterV2(e)
+      case _ => None
+    }
+
+    if (dataSourceFilters.nonEmpty) {
+      val originalPartitioning = outputPartitioning
+
+      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
+      val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
+      filterableScan.filter(dataSourceFilters.toArray)
+
+      // call toBatch again to get filtered partitions
+      val newPartitions = scan.toBatch.planInputPartitions()
+
+      originalPartitioning match {
+        case p: KeyGroupedPartitioning =>
+          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
+            throw new SparkException("Data source must have preserved the 
original partitioning " +
+              "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
+              "filtering")
+          }
+          val newPartitionValues = newPartitions.map(partition =>
+            
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey], 
p.expressions))
+            .toSet
+          val oldPartitionValues = p.partitionValues
+            .map(partition => InternalRowComparableWrapper(partition, 
p.expressions)).toSet
+          // We require the new number of partition values to be equal or less 
than the old number
+          // of partition values here. In the case of less than, empty 
partitions will be added for
+          // those missing values that are not present in the new input 
partitions.
+          if (oldPartitionValues.size < newPartitionValues.size) {
+            throw new SparkException("During runtime filtering, data source 
must either report " +
+              "the same number of partition values, or a subset of partition 
values from the " +
+              s"original. Before: ${oldPartitionValues.size} partition values. 
" +
+              s"After: ${newPartitionValues.size} partition values")
+          }
+
+          if (!newPartitionValues.forall(oldPartitionValues.contains)) {
+            throw new SparkException("During runtime filtering, data source 
must not report new " +
+              "partition values that are not present in the original 
partitioning.")
+          }
+
+          groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)
+
+        case _ =>
+          // no validation is needed as the data source did not report any 
specific partitioning
+          newPartitions.map(Seq(_))
+      }
+
+    } else {
+      partitions
+    }
+  }
+
+  override def outputPartitioning: Partitioning = {
+    super.outputPartitioning match {
+      case k: KeyGroupedPartitioning if 
spjParams.commonPartitionValues.isDefined =>
+        // We allow duplicated partition values if
+        // 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
+        val newPartValues = spjParams.commonPartitionValues.get.flatMap {
+          case (partValue, numSplits) => Seq.fill(numSplits)(partValue)
+        }
+        k.copy(numPartitions = newPartValues.length, partitionValues = 
newPartValues)
+      case p => p
+    }
+  }
+
+  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
+
+  override lazy val inputRDD: RDD[InternalRow] = {
+    val rdd = if (filteredPartitions.isEmpty && outputPartitioning == 
SinglePartition) {
+      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
+      sparkContext.parallelize(Array.empty[InternalRow], 1)
+    } else {
+      var finalPartitions = filteredPartitions
+
+      outputPartitioning match {
+        case p: KeyGroupedPartitioning =>
+          if (conf.v2BucketingPushPartValuesEnabled &&
+            conf.v2BucketingPartiallyClusteredDistributionEnabled) {
+            assert(filteredPartitions.forall(_.size == 1),
+              "Expect partitions to be not grouped when " +
+                
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
+                "is enabled")
+
+            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head),
+              groupSplits = true).getOrElse(Seq.empty)
+
+            // This means the input partitions are not grouped by partition 
values. We'll need to
+            // check `groupByPartitionValues` and decide whether to group and 
replicate splits
+            // within a partition.
+            if (spjParams.commonPartitionValues.isDefined &&
+              spjParams.applyPartialClustering) {
+              // A mapping from the common partition values to how many splits 
the partition
+              // should contain. Note this no longer maintain the partition 
key ordering.
+              val commonPartValuesMap = spjParams.commonPartitionValues
+                .get
+                .map(t => (InternalRowComparableWrapper(t._1, p.expressions), 
t._2))
+                .toMap
+              val nestGroupedPartitions = groupedPartitions.map {
+                case (partValue, splits) =>
+                  // `commonPartValuesMap` should contain the part value since 
it's the super set.
+                  val numSplits = commonPartValuesMap
+                    .get(InternalRowComparableWrapper(partValue, 
p.expressions))
+                  assert(numSplits.isDefined, s"Partition value $partValue 
does not exist in " +
+                    "common partition values from Spark plan")
+
+                  val newSplits = if (spjParams.replicatePartitions) {
+                    // We need to also replicate partitions according to the 
other side of join
+                    Seq.fill(numSplits.get)(splits)
+                  } else {
+                    // Not grouping by partition values: this could be the 
side with partially
+                    // clustered distribution. Because of dynamic filtering, 
we'll need to check if
+                    // the final number of splits of a partition is smaller 
than the original
+                    // number, and fill with empty splits if so. This is 
necessary so that both
+                    // sides of a join will have the same number of partitions 
& splits.
+                    splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
+                  }
+                  (InternalRowComparableWrapper(partValue, p.expressions), 
newSplits)
+              }
+
+              // Now fill missing partition keys with empty partitions
+              val partitionMapping = nestGroupedPartitions.toMap
+              finalPartitions = spjParams.commonPartitionValues.get.flatMap {
+                case (partValue, numSplits) =>
+                  // Use empty partition for those partition values that are 
not present.
+                  partitionMapping.getOrElse(
+                    InternalRowComparableWrapper(partValue, p.expressions),
+                    Seq.fill(numSplits)(Seq.empty))
+              }
+            } else {
+              // either `commonPartitionValues` is not defined, or it is 
defined but
+              // `applyPartialClustering` is false.
+              val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
+                InternalRowComparableWrapper(row, p.expressions) -> parts
+              }.toMap
+
+              // In case `commonPartitionValues` is not defined (e.g., SPJ is 
not used), there
+              // could exist duplicated partition values, as partition 
grouping is not done
+              // at the beginning and postponed to this method. It is 
important to use unique
+              // partition values here so that grouped partitions won't get 
duplicated.
+              finalPartitions = p.uniquePartitionValues.map { partValue =>
+                // Use empty partition for those partition values that are not 
present
+                partitionMapping.getOrElse(
+                  InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
+              }
+            }
+          } else {
+            val partitionMapping = finalPartitions.map { parts =>
+              val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
+              InternalRowComparableWrapper(row, p.expressions) -> parts
+            }.toMap
+            finalPartitions = p.partitionValues.map { partValue =>
+              // Use empty partition for those partition values that are not 
present
+              partitionMapping.getOrElse(
+                InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
+            }
+          }
+
+        case _ =>
+      }
+
+      new DataSourceRDD(
+        sparkContext, finalPartitions, readerFactory, supportsColumnar, 
customMetrics)
+    }
+    postDriverMetrics()
+    rdd
+  }
+
+  override def keyGroupedPartitioning: Option[Seq[Expression]] =
+    spjParams.keyGroupedPartitioning
+
+  override def doCanonicalize(): AbstractBatchScanExec = {
+    this.copy(
+      output = output.map(QueryPlan.normalizeExpressions(_, output)),
+      runtimeFilters = QueryPlan.normalizePredicates(
+        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
+        output))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
+    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
+    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
+    redact(result)
+  }
+
+  override def nodeName: String = {
+    s"BatchScan ${table.name()}".trim
+  }
+}
+
+case class StoragePartitionJoinParams(
+                                       keyGroupedPartitioning: 
Option[Seq[Expression]] = None,
+                                       commonPartitionValues: 
Option[Seq[(InternalRow, Int)]] = None,
+                                       applyPartialClustering: Boolean = false,
+                                       replicatePartitions: Boolean = false) {
+  override def equals(other: Any): Boolean = other match {
+    case other: StoragePartitionJoinParams =>
+      this.commonPartitionValues == other.commonPartitionValues &&
+        this.replicatePartitions == other.replicatePartitions &&
+        this.applyPartialClustering == other.applyPartialClustering
+    case _ =>
+      false
+  }
+
+  override def hashCode(): Int = Objects.hashCode(
+    commonPartitionValues: Option[Seq[(InternalRow, Int)]],
+    applyPartialClustering: java.lang.Boolean,
+    replicatePartitions: java.lang.Boolean)
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to