This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ba25aece7e [GLUTEN-11088][CORE] Fix Spark 4.0
GlutenDynamicPartitionPruningV1SuiteAEOn (#11212)
ba25aece7e is described below
commit ba25aece7e3a9b14aef7cee9d9f71d917f1c6d30
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Dec 1 15:19:46 2025 +0000
[GLUTEN-11088][CORE] Fix Spark 4.0 GlutenDynamicPartitionPruningV1SuiteAEOn
(#11212)
Support FileSourceScanExecTransformer stream.
FileSourceScanExecTransformer is not equal because dataFilters is not
equal, (cast(x#688 as double) = ReusedSubquery Subquery subquery#683,
[id=#2324]) and (cast(x#688 as double) = Subquery subquery#683, [id=#2324])
should be same, must use canonicalized expression to deduplicate.
Fix dynamic pruning FileSourceScanExec getPartitions because add new class
ScanFileListing and other refactor for inputRDD
---
.../gluten/execution/DeltaScanTransformer.scala | 6 +
.../gluten/extension/DeltaPostTransformRules.scala | 1 +
.../gluten/execution/HudiScanTransformer.scala | 6 +
.../BasicPhysicalOperatorTransformer.scala | 3 +-
.../execution/FileSourceScanExecTransformer.scala | 40 ++++---
.../gluten/execution/ScanTransformerFactory.scala | 1 +
.../sql/extension/CustomerColumnarPreRules.scala | 3 +
.../TestFileSourceScanExecTransformer.scala | 4 +
.../sql/extension/CustomerColumnarPreRules.scala | 3 +
.../TestFileSourceScanExecTransformer.scala | 3 +
.../sql/extension/CustomerColumnarPreRules.scala | 3 +
.../TestFileSourceScanExecTransformer.scala | 3 +
.../sql/extension/CustomerColumnarPreRules.scala | 3 +
.../TestFileSourceScanExecTransformer.scala | 3 +
.../gluten/utils/velox/VeloxTestSettings.scala | 2 -
.../sql/GlutenDynamicPartitionPruningSuite.scala | 71 +----------
.../sql/extension/CustomerColumnarPreRules.scala | 3 +
.../TestFileSourceScanExecTransformer.scala | 3 +
.../org/apache/gluten/sql/shims/SparkShims.scala | 5 +
.../sql/execution/FileSourceScanExecShim.scala | 5 +
.../sql/execution/FileSourceScanExecShim.scala | 5 +
.../sql/execution/FileSourceScanExecShim.scala | 5 +
.../sql/execution/FileSourceScanExecShim.scala | 5 +
.../gluten/sql/shims/spark40/Spark40Shims.scala | 5 +
.../sql/execution/FileSourceScanExecShim.scala | 130 ++++++++++++++++++++-
25 files changed, 234 insertions(+), 87 deletions(-)
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index c3b88e8225..1be03dd404 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -16,11 +16,13 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
@@ -28,6 +30,7 @@ import org.apache.spark.util.collection.BitSet
case class DeltaScanTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -39,6 +42,7 @@ case class DeltaScanTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
@@ -66,6 +70,7 @@ case class DeltaScanTransformer(
override def doCanonicalize(): DeltaScanTransformer = {
DeltaScanTransformer(
relation,
+ None,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
@@ -89,6 +94,7 @@ object DeltaScanTransformer {
def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = {
new DeltaScanTransformer(
scanExec.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(scanExec),
scanExec.output,
scanExec.requiredSchema,
scanExec.partitionFilters,
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index 156fdf791a..da81bdf83f 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -153,6 +153,7 @@ object DeltaPostTransformRules {
// replace tableName in schema with physicalName
val scanExecTransformer = new DeltaScanTransformer(
newFsRelation,
+ plan.stream,
newOutput,
DeltaColumnMapping.createPhysicalSchema(
plan.requiredSchema,
diff --git
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
index b320ccef33..0c427c68cc 100644
---
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
+++
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
@@ -16,11 +16,13 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
@@ -28,6 +30,7 @@ import org.apache.spark.util.collection.BitSet
case class HudiScanTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -39,6 +42,7 @@ case class HudiScanTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
@@ -58,6 +62,7 @@ case class HudiScanTransformer(
override def doCanonicalize(): HudiScanTransformer = {
HudiScanTransformer(
relation,
+ None,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
@@ -81,6 +86,7 @@ object HudiScanTransformer {
def apply(scanExec: FileSourceScanExec): HudiScanTransformer = {
new HudiScanTransformer(
scanExec.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(scanExec),
scanExec.output,
scanExec.requiredSchema,
scanExec.partitionFilters,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 24486efa54..93d6201c73 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -313,7 +313,8 @@ object FilterHandler extends PredicateHelper {
}
def subtractFilters(left: Seq[Expression], right: Seq[Expression]):
Seq[Expression] = {
- (left.toSet -- right.toSet).toSeq
+ val scanSet = right.map(_.canonicalized).toSet
+ left.filter(f => !scanSet.contains(f.canonicalized))
}
def combineFilters(left: Seq[Expression], right: Seq[Expression]):
Seq[Expression] = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 1e7de78d94..b53bbb45ba 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -32,12 +32,14 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SparkVersionUtil
import org.apache.spark.util.collection.BitSet
import org.apache.commons.lang3.StringUtils
case class FileSourceScanExecTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -49,6 +51,7 @@ case class FileSourceScanExecTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
@@ -61,6 +64,9 @@ case class FileSourceScanExecTransformer(
override def doCanonicalize(): FileSourceScanExecTransformer = {
FileSourceScanExecTransformer(
relation,
+ // remove stream on canonicalization; this is needed for reused shuffle
to be effective in
+ // self-join
+ None,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
@@ -81,6 +87,7 @@ case class FileSourceScanExecTransformer(
abstract class FileSourceScanExecTransformerBase(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
requiredSchema: StructType,
partitionFilters: Seq[Expression],
@@ -112,18 +119,22 @@ abstract class FileSourceScanExecTransformerBase(
override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
override def getPartitions: Seq[Partition] = {
- BackendsApiManager.getTransformerApiInstance
- .genPartitionSeq(
- relation,
- requiredSchema,
- getPartitionArray,
- output,
- bucketedScan,
- optionalBucketSet,
- optionalNumCoalescedBuckets,
- disableBucketedScan,
- filterExprs()
- )
+ if (SparkVersionUtil.gteSpark40) {
+ getPartitionsSeq()
+ } else {
+ BackendsApiManager.getTransformerApiInstance
+ .genPartitionSeq(
+ relation,
+ requiredSchema,
+ getPartitionArray,
+ output,
+ bucketedScan,
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ disableBucketedScan,
+ filterExprs()
+ )
+ }
}
override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
@@ -205,12 +216,9 @@ abstract class FileSourceScanExecTransformerBase(
s" $nativeFiltersString")
}
- // Required for Spark 4.0 to implement a trait method.
// The "override" keyword is omitted to maintain compatibility with earlier
Spark versions.
def getStream: Option[SparkDataStream] = {
- throw new UnsupportedOperationException(
- "not supported on streaming"
- )
+ stream
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 2a8cc91382..711f8c6908 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -27,6 +27,7 @@ object ScanTransformerFactory {
scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = {
FileSourceScanExecTransformer(
scanExec.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(scanExec),
scanExec.output,
scanExec.requiredSchema,
scanExec.partitionFilters,
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.extension
+import org.apache.gluten.sql.shims.SparkShimLoader
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
case fileSourceScan: FileSourceScanExec =>
val transformer = new TestFileSourceScanExecTransformer(
fileSourceScan.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
fileSourceScan.output,
fileSourceScan.requiredSchema,
fileSourceScan.partitionFilters,
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 2857a4a680..1127e26a42 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -24,6 +24,7 @@ import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -31,6 +32,7 @@ import org.apache.spark.util.collection.BitSet
/** Test for customer column rules */
case class TestFileSourceScanExecTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -42,6 +44,7 @@ case class TestFileSourceScanExecTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
@@ -71,6 +74,7 @@ case class TestFileSourceScanExecTransformer(
override def doCanonicalize(): TestFileSourceScanExecTransformer = {
TestFileSourceScanExecTransformer(
relation,
+ None,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.extension
+import org.apache.gluten.sql.shims.SparkShimLoader
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
case fileSourceScan: FileSourceScanExec =>
val transformer = new TestFileSourceScanExecTransformer(
fileSourceScan.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
fileSourceScan.output,
fileSourceScan.requiredSchema,
fileSourceScan.partitionFilters,
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6aced7cfa6..c2a775bd7f 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
/** Test for customer column rules */
case class TestFileSourceScanExecTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.extension
+import org.apache.gluten.sql.shims.SparkShimLoader
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
case fileSourceScan: FileSourceScanExec =>
val transformer = new TestFileSourceScanExecTransformer(
fileSourceScan.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
fileSourceScan.output,
fileSourceScan.requiredSchema,
fileSourceScan.partitionFilters,
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6aced7cfa6..c2a775bd7f 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
/** Test for customer column rules */
case class TestFileSourceScanExecTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.extension
+import org.apache.gluten.sql.shims.SparkShimLoader
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
case fileSourceScan: FileSourceScanExec =>
val transformer = new TestFileSourceScanExecTransformer(
fileSourceScan.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
fileSourceScan.output,
fileSourceScan.requiredSchema,
fileSourceScan.partitionFilters,
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6aced7cfa6..c2a775bd7f 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
/** Test for customer column rules */
case class TestFileSourceScanExecTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 5fa43d50f7..19bc85dbcf 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -853,8 +853,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDeprecatedAPISuite]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOff]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOn]
- // TODO: fix in Spark-4.0
- .exclude("join key with multiple references on the filtering plan")
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOnDisableScan]
enableSuite[GlutenDynamicPartitionPruningV1SuiteAEOffDisableScan]
enableSuite[GlutenDynamicPartitionPruningV2SuiteAEOff]
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
index df42c79099..dc96c09bc2 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
@@ -30,7 +30,6 @@ import
org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExc
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming.{MemoryStream,
StreamingQueryWrapper}
-import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
abstract class GlutenDynamicPartitionPruningSuiteBase
@@ -48,8 +47,7 @@ abstract class GlutenDynamicPartitionPruningSuiteBase
override def testNameBlackList: Seq[String] = Seq(
// overwritten with different plan
"SPARK-38674: Remove useless deduplicate in SubqueryBroadcastExec",
- "Make sure dynamic pruning works on uncorrelated queries",
- "Subquery reuse across the whole plan"
+ "Make sure dynamic pruning works on uncorrelated queries"
)
// === Following cases override super class's cases ===
@@ -79,8 +77,7 @@ abstract class GlutenDynamicPartitionPruningSuiteBase
}
}
- // TODO: fix in Spark-4.0
- ignoreGluten("no partition pruning when the build side is a stream") {
+ testGluten("no partition pruning when the build side is a stream") {
withTable("fact") {
val input = MemoryStream[Int]
val stream = input.toDF.select($"value".as("one"), ($"value" *
3).as("code"))
@@ -339,8 +336,7 @@ abstract class GlutenDynamicPartitionPruningV1Suite extends
GlutenDynamicPartiti
import testImplicits._
/** Check the static scan metrics with and without DPP */
- // TODO: fix in Spark-4.0
- ignoreGluten("static scan metrics", DisableAdaptiveExecution("DPP in AQE
must reuse broadcast")) {
+ testGluten("static scan metrics", DisableAdaptiveExecution("DPP in AQE must
reuse broadcast")) {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
@@ -461,8 +457,7 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff
import testImplicits._
- // TODO: fix in Spark-4.0
- ignoreGluten(
+ testGluten(
"override static scan metrics",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(
@@ -577,64 +572,6 @@ class GlutenDynamicPartitionPruningV1SuiteAEOff
}
}
}
-
- // TODO: fix in Spark-4.0
- ignoreGluten(
- "Subquery reuse across the whole plan",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
- withSQLConf(
- SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
- SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false"
- ) {
- withTable("df1", "df2") {
- spark
- .range(100)
- .select(col("id"), col("id").as("k"))
- .write
- .partitionBy("k")
- .format(tableFormat)
- .mode("overwrite")
- .saveAsTable("df1")
-
- spark
- .range(10)
- .select(col("id"), col("id").as("k"))
- .write
- .partitionBy("k")
- .format(tableFormat)
- .mode("overwrite")
- .saveAsTable("df2")
-
- val df = sql("""
- |SELECT df1.id, df2.k
- |FROM df1 JOIN df2 ON df1.k = df2.k
- |WHERE df2.id < (SELECT max(id) FROM df2 WHERE id <= 2)
- |""".stripMargin)
-
- checkPartitionPruningPredicate(df, true, false)
-
- checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
-
- val plan = df.queryExecution.executedPlan
-
- val subqueryIds = plan.collectWithSubqueries { case s: SubqueryExec =>
s.id }
- val reusedSubqueryIds = plan.collectWithSubqueries {
- case rs: ReusedSubqueryExec => rs.child.id
- }
-
- // By default Gluten pushes more filters than vanilla Spark.
- //
- // See also
org.apache.gluten.execution.FilterHandler#applyFilterPushdownToScan
- // See also DynamicPartitionPruningSuite.scala:1362
- assert(subqueryIds.size == 3, "Whole plan subquery reusing not working
correctly")
- assert(reusedSubqueryIds.size == 2, "Whole plan subquery reusing not
working correctly")
- assert(
- reusedSubqueryIds.forall(subqueryIds.contains(_)),
- "ReusedSubqueryExec should reuse an existing subquery")
- }
- }
- }
}
class GlutenDynamicPartitionPruningV1SuiteAEOn
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index 2ee1573ea0..5de35daaed 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.extension
+import org.apache.gluten.sql.shims.SparkShimLoader
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -26,6 +28,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
case fileSourceScan: FileSourceScanExec =>
val transformer = new TestFileSourceScanExecTransformer(
fileSourceScan.relation,
+ SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan),
fileSourceScan.output,
fileSourceScan.requiredSchema,
fileSourceScan.partitionFilters,
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 644e3c5839..18c5709bf5 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -23,6 +23,7 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -30,6 +31,7 @@ import org.apache.spark.util.collection.BitSet
/** Test for customer column rules */
case class TestFileSourceScanExecTransformer(
@transient override val relation: HadoopFsRelation,
+ @transient val stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
@@ -41,6 +43,7 @@ case class TestFileSourceScanExecTransformer(
override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
+ stream,
output,
requiredSchema,
partitionFilters,
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 85916b316c..dcfc6b2104 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -341,4 +342,8 @@ trait SparkShims {
s"output path: $descriptionPath",
t)
}
+
+ def getFileSourceScanStream(scan: FileSourceScanExec):
Option[SparkDataStream] = {
+ None
+ }
}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index a15fc73430..9e61b290f2 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, Expression, PlanExpression, Predicate}
@@ -154,6 +155,10 @@ abstract class FileSourceScanExecShim(
def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
+
+ def getPartitionsSeq(): Seq[Partition] = {
+ Seq()
+ }
}
abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 3dbd29ef1c..240b377d45 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, Expression, FileSourceMetadataAttribute,
PlanExpression, Predicate}
@@ -163,6 +164,10 @@ abstract class FileSourceScanExecShim(
def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
+
+ def getPartitionsSeq(): Seq[Partition] = {
+ Seq()
+ }
}
abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 01445c170c..2ab777c5e2 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, Expression,
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute,
PlanExpression, Predicate}
@@ -123,6 +124,10 @@ abstract class FileSourceScanExecShim(
def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
+
+ def getPartitionsSeq(): Seq[Partition] = {
+ Seq()
+ }
}
abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 75049793d8..eb91b00f22 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, Expression,
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute,
PlanExpression, Predicate}
@@ -120,6 +121,10 @@ abstract class FileSourceScanExecShim(
def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
+
+ def getPartitionsSeq(): Seq[Partition] = {
+ Seq()
+ }
}
abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 247394ba91..66fa54bb93 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -44,6 +44,7 @@ import
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters}
@@ -768,4 +769,8 @@ class Spark40Shims extends SparkShims {
descriptionPath: String): Unit = {
throw t
}
+
+ override def getFileSourceScanStream(scan: FileSourceScanExec):
Option[SparkDataStream] = {
+ scan.stream
+ }
}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 73cc9ba2ce..2dd8ff3867 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -19,16 +19,24 @@ package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.spark.Partition
+import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES,
OPEN_COST_IN_BYTES}
+import org.apache.spark.internal.MDC
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
AttributeReference, BoundReference, Expression,
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute,
PlanExpression, Predicate}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FilePartition, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.BitSet
+import org.apache.hadoop.fs.Path
+
abstract class FileSourceScanExecShim(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
@@ -129,6 +137,126 @@ abstract class FileSourceScanExecShim(
relation.location.listFiles(staticPartitionFilters, staticDataFilters)
partitionDirectories.toArray
}
+
+ /**
+ * Create an RDD for bucketed reads. The non-bucketed variant of this
function is
+ * [[createReadRDD]].
+ *
+ * The algorithm is pretty simple: each RDD partition being returned should
include all the files
+ * with the same bucket id from all the given Hive partitions.
+ *
+ * @param bucketSpec
+ * the bucketing spec.
+ * @param selectedPartitions
+ * Hive-style partition that are part of the read.
+ */
+ private def createBucketedReadPartition(
+ bucketSpec: BucketSpec,
+ selectedPartitions: ScanFileListing): Seq[FilePartition] = {
+ logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets")
+ val partitionArray = selectedPartitions.toPartitionArray
+ val filesGroupedToBuckets = partitionArray.groupBy {
+ f =>
+ BucketingUtils
+ .getBucketId(f.toPath.getName)
+ .getOrElse(throw
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
+ }
+
+ val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
+ val bucketSet = optionalBucketSet.get
+ filesGroupedToBuckets.filter(f => bucketSet.get(f._1))
+ } else {
+ filesGroupedToBuckets
+ }
+
+ val filePartitions = optionalNumCoalescedBuckets
+ .map {
+ numCoalescedBuckets =>
+ logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)}
buckets")
+ val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
numCoalescedBuckets)
+ Seq.tabulate(numCoalescedBuckets) {
+ bucketId =>
+ val partitionedFiles = coalescedBuckets
+ .get(bucketId)
+ .map {
+ _.values.flatten.toArray
+ }
+ .getOrElse(Array.empty)
+ FilePartition(bucketId, partitionedFiles)
+ }
+ }
+ .getOrElse {
+ Seq.tabulate(bucketSpec.numBuckets) {
+ bucketId =>
+ FilePartition(bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
+ }
+ }
+ filePartitions
+ }
+
+ /**
+ * Create an RDD for non-bucketed reads. The bucketed variant of this
function is
+ * [[createBucketedReadRDD]].
+ *
+ * @param selectedPartitions
+ * Hive-style partition that are part of the read.
+ */
+ private def createReadPartitions(selectedPartitions: ScanFileListing):
Seq[FilePartition] = {
+ val openCostInBytes =
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
+ val maxSplitBytes =
+ FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
+ logInfo(log"Planning scan with bin packing, max size:
${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " +
+ log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES,
openCostInBytes)} " +
+ log"bytes.")
+
+ // Filter files with bucket pruning if possible
+ val bucketingEnabled =
relation.sparkSession.sessionState.conf.bucketingEnabled
+ val shouldProcess: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) if bucketingEnabled =>
+ // Do not prune the file if bucket file name is invalid
+ filePath =>
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+ case _ =>
+ _ => true
+ }
+
+ val splitFiles = selectedPartitions.filePartitionIterator
+ .flatMap {
+ partition =>
+ val ListingPartition(partitionVals, _, fileStatusIterator) =
partition
+ fileStatusIterator.flatMap {
+ file =>
+ // getPath() is very expensive so we only want to call it once
in this block:
+ val filePath = file.getPath
+ if (shouldProcess(filePath)) {
+ val isSplitable =
+ relation.fileFormat.isSplitable(relation.sparkSession,
relation.options, filePath)
+ PartitionedFileUtil.splitFiles(
+ file = file,
+ filePath = filePath,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partitionVals
+ )
+ } else {
+ Seq.empty
+ }
+ }
+ }
+ .toArray
+ .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+ val partitions = FilePartition
+ .getFilePartitions(relation.sparkSession,
splitFiles.toImmutableArraySeq, maxSplitBytes)
+ partitions
+ }
+
+ def getPartitionsSeq(): Seq[Partition] = {
+ if (bucketedScan) {
+ createBucketedReadPartition(relation.bucketSpec.get,
dynamicallySelectedPartitions)
+ } else {
+ createReadPartitions(dynamicallySelectedPartitions)
+ }
+ }
}
abstract class ArrowFileSourceScanLikeShim(original: FileSourceScanExec)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]