This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ba25aece7e [GLUTEN-11088][CORE] Fix Spark 4.0 
GlutenDynamicPartitionPruningV1SuiteAEOn (#11212)
ba25aece7e is described below

commit ba25aece7e3a9b14aef7cee9d9f71d917f1c6d30
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Dec 1 15:19:46 2025 +0000

    [GLUTEN-11088][CORE] Fix Spark 4.0 GlutenDynamicPartitionPruningV1SuiteAEOn 
(#11212)
    
    Support FileSourceScanExecTransformer stream.
    
    FileSourceScanExecTransformer is not equal because dataFilters is not 
equal, (cast(x#688 as double) = ReusedSubquery Subquery subquery#683, 
[id=#2324]) and (cast(x#688 as double) = Subquery subquery#683, [id=#2324]) 
should be same, must use canonicalized expression to deduplicate.
    
    Fix dynamic pruning FileSourceScanExec getPartitions because add new class 
ScanFileListing and other refactor for inputRDD
---
 .../gluten/execution/DeltaScanTransformer.scala    |   6 +
 .../gluten/extension/DeltaPostTransformRules.scala |   1 +
 .../gluten/execution/HudiScanTransformer.scala     |   6 +
 .../BasicPhysicalOperatorTransformer.scala         |   3 +-
 .../execution/FileSourceScanExecTransformer.scala  |  40 ++++---
 .../gluten/execution/ScanTransformerFactory.scala  |   1 +
 .../sql/extension/CustomerColumnarPreRules.scala   |   3 +
 .../TestFileSourceScanExecTransformer.scala        |   4 +
 .../sql/extension/CustomerColumnarPreRules.scala   |   3 +
 .../TestFileSourceScanExecTransformer.scala        |   3 +
 .../sql/extension/CustomerColumnarPreRules.scala   |   3 +
 .../TestFileSourceScanExecTransformer.scala        |   3 +
 .../sql/extension/CustomerColumnarPreRules.scala   |   3 +
 .../TestFileSourceScanExecTransformer.scala        |   3 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |   2 -
 .../sql/GlutenDynamicPartitionPruningSuite.scala   |  71 +----------
 .../sql/extension/CustomerColumnarPreRules.scala   |   3 +
 .../TestFileSourceScanExecTransformer.scala        |   3 +
 .../org/apache/gluten/sql/shims/SparkShims.scala   |   5 +
 .../sql/execution/FileSourceScanExecShim.scala     |   5 +
 .../sql/execution/FileSourceScanExecShim.scala     |   5 +
 .../sql/execution/FileSourceScanExecShim.scala     |   5 +
 .../sql/execution/FileSourceScanExecShim.scala     |   5 +
 .../gluten/sql/shims/spark40/Spark40Shims.scala    |   5 +
 .../sql/execution/FileSourceScanExecShim.scala     | 130 ++++++++++++++++++++-
 25 files changed, 234 insertions(+), 87 deletions(-)

diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index c3b88e8225..1be03dd404 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -16,11 +16,13 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
@@ -28,6 +30,7 @@ import org.apache.spark.util.collection.BitSet
 
 case class DeltaScanTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -39,6 +42,7 @@ case class DeltaScanTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
@@ -66,6 +70,7 @@ case class DeltaScanTransformer(
   override def doCanonicalize(): DeltaScanTransformer = {
     DeltaScanTransformer(
       relation,
+      None,
       output.map(QueryPlan.normalizeExpressions(_, output)),
       requiredSchema,
       QueryPlan.normalizePredicates(
@@ -89,6 +94,7 @@ object DeltaScanTransformer {
   def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = {
     new DeltaScanTransformer(
       scanExec.relation,
+      SparkShimLoader.getSparkShims.getFileSourceScanStream(scanExec),
       scanExec.output,
       scanExec.requiredSchema,
       scanExec.partitionFilters,
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index 156fdf791a..da81bdf83f 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -153,6 +153,7 @@ object DeltaPostTransformRules {
       // replace tableName in schema with physicalName
       val scanExecTransformer = new DeltaScanTransformer(
         newFsRelation,
+        plan.stream,
         newOutput,
         DeltaColumnMapping.createPhysicalSchema(
           plan.requiredSchema,
diff --git 
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
index b320ccef33..0c427c68cc 100644
--- 
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
+++ 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
@@ -16,11 +16,13 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
@@ -28,6 +30,7 @@ import org.apache.spark.util.collection.BitSet
 
 case class HudiScanTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -39,6 +42,7 @@ case class HudiScanTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
@@ -58,6 +62,7 @@ case class HudiScanTransformer(
   override def doCanonicalize(): HudiScanTransformer = {
     HudiScanTransformer(
       relation,
+      None,
       output.map(QueryPlan.normalizeExpressions(_, output)),
       requiredSchema,
       QueryPlan.normalizePredicates(
@@ -81,6 +86,7 @@ object HudiScanTransformer {
   def apply(scanExec: FileSourceScanExec): HudiScanTransformer = {
     new HudiScanTransformer(
       scanExec.relation,
+      SparkShimLoader.getSparkShims.getFileSourceScanStream(scanExec),
       scanExec.output,
       scanExec.requiredSchema,
       scanExec.partitionFilters,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 24486efa54..93d6201c73 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -313,7 +313,8 @@ object FilterHandler extends PredicateHelper {
   }
 
   def subtractFilters(left: Seq[Expression], right: Seq[Expression]): 
Seq[Expression] = {
-    (left.toSet -- right.toSet).toSeq
+    val scanSet = right.map(_.canonicalized).toSet
+    left.filter(f => !scanSet.contains(f.canonicalized))
   }
 
   def combineFilters(left: Seq[Expression], right: Seq[Expression]): 
Seq[Expression] = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 1e7de78d94..b53bbb45ba 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -32,12 +32,14 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SparkVersionUtil
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.commons.lang3.StringUtils
 
 case class FileSourceScanExecTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -49,6 +51,7 @@ case class FileSourceScanExecTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
@@ -61,6 +64,9 @@ case class FileSourceScanExecTransformer(
   override def doCanonicalize(): FileSourceScanExecTransformer = {
     FileSourceScanExecTransformer(
       relation,
+      // remove stream on canonicalization; this is needed for reused shuffle 
to be effective in
+      // self-join
+      None,
       output.map(QueryPlan.normalizeExpressions(_, output)),
       requiredSchema,
       QueryPlan.normalizePredicates(
@@ -81,6 +87,7 @@ case class FileSourceScanExecTransformer(
 
 abstract class FileSourceScanExecTransformerBase(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     requiredSchema: StructType,
     partitionFilters: Seq[Expression],
@@ -112,18 +119,22 @@ abstract class FileSourceScanExecTransformerBase(
   override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
 
   override def getPartitions: Seq[Partition] = {
-    BackendsApiManager.getTransformerApiInstance
-      .genPartitionSeq(
-        relation,
-        requiredSchema,
-        getPartitionArray,
-        output,
-        bucketedScan,
-        optionalBucketSet,
-        optionalNumCoalescedBuckets,
-        disableBucketedScan,
-        filterExprs()
-      )
+    if (SparkVersionUtil.gteSpark40) {
+      getPartitionsSeq()
+    } else {
+      BackendsApiManager.getTransformerApiInstance
+        .genPartitionSeq(
+          relation,
+          requiredSchema,
+          getPartitionArray,
+          output,
+          bucketedScan,
+          optionalBucketSet,
+          optionalNumCoalescedBuckets,
+          disableBucketedScan,
+          filterExprs()
+        )
+    }
   }
 
   override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
@@ -205,12 +216,9 @@ abstract class FileSourceScanExecTransformerBase(
         s" $nativeFiltersString")
   }
 
-  // Required for Spark 4.0 to implement a trait method.
   // The "override" keyword is omitted to maintain compatibility with earlier 
Spark versions.
   def getStream: Option[SparkDataStream] = {
-    throw new UnsupportedOperationException(
-      "not supported on streaming"
-    )
+    stream
   }
 }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 2a8cc91382..711f8c6908 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -27,6 +27,7 @@ object ScanTransformerFactory {
       scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = {
     FileSourceScanExecTransformer(
       scanExec.relation,
+      SparkShimLoader.getSparkShims.getFileSourceScanStream(scanExec),
       scanExec.output,
       scanExec.requiredSchema,
       scanExec.partitionFilters,
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.extension
 
+import org.apache.gluten.sql.shims.SparkShimLoader
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession) 
extends Rule[SparkPla
     case fileSourceScan: FileSourceScanExec =>
       val transformer = new TestFileSourceScanExecTransformer(
         fileSourceScan.relation,
+        SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
         fileSourceScan.output,
         fileSourceScan.requiredSchema,
         fileSourceScan.partitionFilters,
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 2857a4a680..1127e26a42 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -24,6 +24,7 @@ import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -31,6 +32,7 @@ import org.apache.spark.util.collection.BitSet
 /** Test for customer column rules */
 case class TestFileSourceScanExecTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -42,6 +44,7 @@ case class TestFileSourceScanExecTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
@@ -71,6 +74,7 @@ case class TestFileSourceScanExecTransformer(
   override def doCanonicalize(): TestFileSourceScanExecTransformer = {
     TestFileSourceScanExecTransformer(
       relation,
+      None,
       output.map(QueryPlan.normalizeExpressions(_, output)),
       requiredSchema,
       QueryPlan.normalizePredicates(
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.extension
 
+import org.apache.gluten.sql.shims.SparkShimLoader
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession) 
extends Rule[SparkPla
     case fileSourceScan: FileSourceScanExec =>
       val transformer = new TestFileSourceScanExecTransformer(
         fileSourceScan.relation,
+        SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
         fileSourceScan.output,
         fileSourceScan.requiredSchema,
         fileSourceScan.partitionFilters,
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6aced7cfa6..c2a775bd7f 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
 /** Test for customer column rules */
 case class TestFileSourceScanExecTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.extension
 
+import org.apache.gluten.sql.shims.SparkShimLoader
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession) 
extends Rule[SparkPla
     case fileSourceScan: FileSourceScanExec =>
       val transformer = new TestFileSourceScanExecTransformer(
         fileSourceScan.relation,
+        SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
         fileSourceScan.output,
         fileSourceScan.requiredSchema,
         fileSourceScan.partitionFilters,
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6aced7cfa6..c2a775bd7f 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
 /** Test for customer column rules */
 case class TestFileSourceScanExecTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.extension
 
+import org.apache.gluten.sql.shims.SparkShimLoader
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession) 
extends Rule[SparkPla
     case fileSourceScan: FileSourceScanExec =>
       val transformer = new TestFileSourceScanExecTransformer(
         fileSourceScan.relation,
+        SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
         fileSourceScan.output,
         fileSourceScan.requiredSchema,
         fileSourceScan.partitionFilters,
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6aced7cfa6..c2a775bd7f 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
 /** Test for customer column rules */
 case class TestFileSourceScanExecTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 5fa43d50f7..19bc85dbcf 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -853,8 +853,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenDeprecatedAPISuite]
   enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOff]
   enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOn]
-    // TODO: fix in Spark-4.0
-    .exclude("join key with multiple references on the filtering plan")
   enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOnDisableScan]
   enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffDisableScan]
   enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOff]
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
index df42c79099..dc96c09bc2 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
@@ -30,7 +30,6 @@ import 
org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExc
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
-import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 
 abstract class GlutenDynamicPartitionPruningSuiteBase
@@ -48,8 +47,7 @@ abstract class GlutenDynamicPartitionPruningSuiteBase
   override def testNameBlackList: Seq[String] = Seq(
     // overwritten with different plan
     "SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec",
-    "Make sure dynamic pruning works on uncorrelated queries",
-    "Subquery reuse across the whole plan"
+    "Make sure dynamic pruning works on uncorrelated queries"
   )
 
   // === Following cases override super class's cases ===
@@ -79,8 +77,7 @@ abstract class GlutenDynamicPartitionPruningSuiteBase
     }
   }
 
-  // TODO: fix in Spark-4.0
-  ignoreGluten("no partition pruning when the build side is a stream") {
+  testGluten("no partition pruning when the build side is a stream") {
     withTable("fact") {
       val input = MemoryStream[Int]
       val stream = input.toDF.select($"value".as("one"), ($"value" * 
3).as("code"))
@@ -339,8 +336,7 @@ abstract class GlutenDynamicPartitionPruningV1Suite extends 
GlutenDynamicPartiti
   import testImplicits._
 
   /** Check the static scan metrics with and without DPP */
-  // TODO: fix in Spark-4.0
-  ignoreGluten("static scan metrics", DisableAdaptiveExecution("DPP in AQE 
must reuse broadcast")) {
+  testGluten("static scan metrics", DisableAdaptiveExecution("DPP in AQE must 
reuse broadcast")) {
     withSQLConf(
       SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
@@ -461,8 +457,7 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff
 
   import testImplicits._
 
-  // TODO: fix in Spark-4.0
-  ignoreGluten(
+  testGluten(
     "override static scan metrics",
     DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
     withSQLConf(
@@ -577,64 +572,6 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff
       }
     }
   }
-
-  // TODO: fix in Spark-4.0
-  ignoreGluten(
-    "Subquery reuse across the whole plan",
-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
-    withSQLConf(
-      SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
-      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
-      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false"
-    ) {
-      withTable("df1", "df2") {
-        spark
-          .range(100)
-          .select(col("id"), col("id").as("k"))
-          .write
-          .partitionBy("k")
-          .format(tableFormat)
-          .mode("overwrite")
-          .saveAsTable("df1")
-
-        spark
-          .range(10)
-          .select(col("id"), col("id").as("k"))
-          .write
-          .partitionBy("k")
-          .format(tableFormat)
-          .mode("overwrite")
-          .saveAsTable("df2")
-
-        val df = sql("""
-                       |SELECT df1.id, df2.k
-                       |FROM df1 JOIN df2 ON df1.k = df2.k
-                       |WHERE df2.id < (SELECT max(id) FROM df2 WHERE id <= 2)
-                       |""".stripMargin)
-
-        checkPartitionPruningPredicate(df, true, false)
-
-        checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
-
-        val plan = df.queryExecution.executedPlan
-
-        val subqueryIds = plan.collectWithSubqueries { case s: SubqueryExec => 
s.id }
-        val reusedSubqueryIds = plan.collectWithSubqueries {
-          case rs: ReusedSubqueryExec => rs.child.id
-        }
-
-        // By default Gluten pushes more filters than vanilla Spark.
-        //
-        // See also 
org.apache.gluten.execution.FilterHandler#applyFilterPushdownToScan
-        // See also DynamicPartitionPruningSuite.scala:1362
-        assert(subqueryIds.size == 3, "Whole plan subquery reusing not working 
correctly")
-        assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not 
working correctly")
-        assert(
-          reusedSubqueryIds.forall(subqueryIds.contains(_)),
-          "ReusedSubqueryExec should reuse an existing subquery")
-      }
-    }
-  }
 }
 
 class GlutenDynamicPartitionPruningV1SuiteAEOn
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.extension
 
+import org.apache.gluten.sql.shims.SparkShimLoader
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession) 
extends Rule[SparkPla
     case fileSourceScan: FileSourceScanExec =>
       val transformer = new TestFileSourceScanExecTransformer(
         fileSourceScan.relation,
+        SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
         fileSourceScan.output,
         fileSourceScan.requiredSchema,
         fileSourceScan.partitionFilters,
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 644e3c5839..18c5709bf5 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
 /** Test for customer column rules */
 case class TestFileSourceScanExecTransformer(
     @transient override val relation: HadoopFsRelation,
+    @transient val stream: Option[SparkDataStream],
     override val output: Seq[Attribute],
     override val requiredSchema: StructType,
     override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
     override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
+    stream,
     output,
     requiredSchema,
     partitionFilters,
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 85916b316c..dcfc6b2104 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -341,4 +342,8 @@ trait SparkShims {
         s"output path: $descriptionPath",
       t)
   }
+
+  def getFileSourceScanStream(scan: FileSourceScanExec): 
Option[SparkDataStream] = {
+    None
+  }
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index a15fc73430..9e61b290f2 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.metrics.GlutenTimeMetric
 
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, Expression, PlanExpression, Predicate}
@@ -154,6 +155,10 @@ abstract class FileSourceScanExecShim(
   def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
+
+  def getPartitionsSeq(): Seq[Partition] = {
+    Seq()
+  }
 }
 
 abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 3dbd29ef1c..240b377d45 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.metrics.GlutenTimeMetric
 
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, Expression, FileSourceMetadataAttribute, 
PlanExpression, Predicate}
@@ -163,6 +164,10 @@ abstract class FileSourceScanExecShim(
   def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
+
+  def getPartitionsSeq(): Seq[Partition] = {
+    Seq()
+  }
 }
 
 abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 01445c170c..2ab777c5e2 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.metrics.GlutenTimeMetric
 import org.apache.gluten.sql.shims.SparkShimLoader
 
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
PlanExpression, Predicate}
@@ -123,6 +124,10 @@ abstract class FileSourceScanExecShim(
   def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
+
+  def getPartitionsSeq(): Seq[Partition] = {
+    Seq()
+  }
 }
 
 abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 75049793d8..eb91b00f22 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.metrics.GlutenTimeMetric
 
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
PlanExpression, Predicate}
@@ -120,6 +121,10 @@ abstract class FileSourceScanExecShim(
   def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
+
+  def getPartitionsSeq(): Seq[Partition] = {
+    Seq()
+  }
 }
 
 abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 247394ba91..66fa54bb93 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -44,6 +44,7 @@ import 
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetFilters}
@@ -768,4 +769,8 @@ class Spark40Shims extends SparkShims {
       descriptionPath: String): Unit = {
     throw t
   }
+
+  override def getFileSourceScanStream(scan: FileSourceScanExec): 
Option[SparkDataStream] = {
+    scan.stream
+  }
 }
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 73cc9ba2ce..2dd8ff3867 100644
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,16 +19,24 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.metrics.GlutenTimeMetric
 import org.apache.gluten.sql.shims.SparkShimLoader
 
+import org.apache.spark.Partition
+import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES, 
OPEN_COST_IN_BYTES}
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
PlanExpression, Predicate}
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.{BucketingUtils, 
FilePartition, HadoopFsRelation, PartitionDirectory}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.BitSet
 
+import org.apache.hadoop.fs.Path
+
 abstract class FileSourceScanExecShim(
     @transient relation: HadoopFsRelation,
     output: Seq[Attribute],
@@ -129,6 +137,126 @@ abstract class FileSourceScanExecShim(
       relation.location.listFiles(staticPartitionFilters, staticDataFilters)
     partitionDirectories.toArray
   }
+
+  /**
+   * Create an RDD for bucketed reads. The non-bucketed variant of this 
function is
+   * [[createReadRDD]].
+   *
+   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
+   * with the same bucket id from all the given Hive partitions.
+   *
+   * @param bucketSpec
+   *   the bucketing spec.
+   * @param selectedPartitions
+   *   Hive-style partition that are part of the read.
+   */
+  private def createBucketedReadPartition(
+      bucketSpec: BucketSpec,
+      selectedPartitions: ScanFileListing): Seq[FilePartition] = {
+    logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets")
+    val partitionArray = selectedPartitions.toPartitionArray
+    val filesGroupedToBuckets = partitionArray.groupBy {
+      f =>
+        BucketingUtils
+          .getBucketId(f.toPath.getName)
+          .getOrElse(throw 
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
+    }
+
+    val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+      val bucketSet = optionalBucketSet.get
+      filesGroupedToBuckets.filter(f => bucketSet.get(f._1))
+    } else {
+      filesGroupedToBuckets
+    }
+
+    val filePartitions = optionalNumCoalescedBuckets
+      .map {
+        numCoalescedBuckets =>
+          logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} 
buckets")
+          val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
+          Seq.tabulate(numCoalescedBuckets) {
+            bucketId =>
+              val partitionedFiles = coalescedBuckets
+                .get(bucketId)
+                .map {
+                  _.values.flatten.toArray
+                }
+                .getOrElse(Array.empty)
+              FilePartition(bucketId, partitionedFiles)
+          }
+      }
+      .getOrElse {
+        Seq.tabulate(bucketSpec.numBuckets) {
+          bucketId =>
+            FilePartition(bucketId, 
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+        }
+      }
+    filePartitions
+  }
+
+  /**
+   * Create an RDD for non-bucketed reads. The bucketed variant of this 
function is
+   * [[createBucketedReadRDD]].
+   *
+   * @param selectedPartitions
+   *   Hive-style partition that are part of the read.
+   */
+  private def createReadPartitions(selectedPartitions: ScanFileListing): 
Seq[FilePartition] = {
+    val openCostInBytes = 
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
+    val maxSplitBytes =
+      FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
+    logInfo(log"Planning scan with bin packing, max size: 
${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " +
+      log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, 
openCostInBytes)} " +
+      log"bytes.")
+
+    // Filter files with bucket pruning if possible
+    val bucketingEnabled = 
relation.sparkSession.sessionState.conf.bucketingEnabled
+    val shouldProcess: Path => Boolean = optionalBucketSet match {
+      case Some(bucketSet) if bucketingEnabled =>
+        // Do not prune the file if bucket file name is invalid
+        filePath => 
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+      case _ =>
+        _ => true
+    }
+
+    val splitFiles = selectedPartitions.filePartitionIterator
+      .flatMap {
+        partition =>
+          val ListingPartition(partitionVals, _, fileStatusIterator) = 
partition
+          fileStatusIterator.flatMap {
+            file =>
+              // getPath() is very expensive so we only want to call it once 
in this block:
+              val filePath = file.getPath
+              if (shouldProcess(filePath)) {
+                val isSplitable =
+                  relation.fileFormat.isSplitable(relation.sparkSession, 
relation.options, filePath)
+                PartitionedFileUtil.splitFiles(
+                  file = file,
+                  filePath = filePath,
+                  isSplitable = isSplitable,
+                  maxSplitBytes = maxSplitBytes,
+                  partitionValues = partitionVals
+                )
+              } else {
+                Seq.empty
+              }
+          }
+      }
+      .toArray
+      .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+    val partitions = FilePartition
+      .getFilePartitions(relation.sparkSession, 
splitFiles.toImmutableArraySeq, maxSplitBytes)
+    partitions
+  }
+
+  def getPartitionsSeq(): Seq[Partition] = {
+    if (bucketedScan) {
+      createBucketedReadPartition(relation.bucketSpec.get, 
dynamicallySelectedPartitions)
+    } else {
+      createReadPartitions(dynamicallySelectedPartitions)
+    }
+  }
 }
 
 abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to