This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2daa733602 [CORE] Make LeafTransformSupport's getPartitions return
Seq[Partition] (#10838)
2daa733602 is described below
commit 2daa733602733e4c38ecac2b40e46b10f80d1db4
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 16 17:13:08 2025 +0800
[CORE] Make LeafTransformSupport's getPartitions return Seq[Partition]
(#10838)
---
.../sql/delta/catalog/ClickHouseTableV2.scala | 5 +-
.../sql/delta/catalog/ClickHouseTableV2.scala | 5 +-
.../sql/delta/catalog/ClickHouseTableV2.scala | 4 +-
.../execution/iceberg/ClickHouseIcebergSuite.scala | 4 +-
.../backendsapi/clickhouse/CHIteratorApi.scala | 9 +-
.../backendsapi/clickhouse/CHTransformerApi.scala | 13 ++-
.../gluten/execution/CHRangeExecTransformer.scala | 4 +-
.../execution/GlutenMergeTreePartition.scala | 4 +-
.../execution/NativeFileScanColumnarRDD.scala | 116 ---------------------
...PartitionsUtil.scala => CHPartitionsUtil.scala} | 14 +--
.../utils/MergeTreePartsPartitionsUtil.scala | 22 ++--
...lutenClickhouseMergetreeSoftAffinitySuite.scala | 5 +-
.../backendsapi/velox/VeloxIteratorApi.scala | 67 ++----------
.../backendsapi/velox/VeloxTransformerApi.scala | 13 ++-
.../gluten/execution/IcebergScanTransformer.scala | 33 ++----
.../spark/source/GlutenIcebergSourceUtil.scala | 64 ++----------
.../org/apache/gluten/execution/IcebergSuite.scala | 4 +-
.../execution/MicroBatchScanExecTransformer.scala | 8 +-
.../gluten/execution/PaimonScanTransformer.scala | 51 +++++----
.../org/apache/gluten/execution/PaimonSuite.scala | 17 +++
.../apache/gluten/backendsapi/IteratorApi.scala | 13 +--
.../apache/gluten/backendsapi/TransformerApi.scala | 8 +-
.../execution/BasicScanExecTransformer.scala | 28 +++--
.../execution/BatchScanExecTransformer.scala | 53 ++++------
.../execution/FileSourceScanExecTransformer.scala | 27 ++---
.../execution/GlutenWholeStageColumnarRDD.scala | 4 +-
.../execution/SparkDataSourceRDDPartition.scala | 34 ++++++
.../gluten/execution/WholeStageTransformer.scala | 102 ++++++------------
...utPartitionsUtil.scala => PartitionsUtil.scala} | 14 +--
.../apache/spark/softaffinity/SoftAffinity.scala | 10 +-
.../sql/hive/HiveTableScanExecTransformer.scala | 6 +-
.../TestFileSourceScanExecTransformer.scala | 23 ++--
.../TestFileSourceScanExecTransformer.scala | 23 ++--
.../TestFileSourceScanExecTransformer.scala | 23 ++--
.../TestFileSourceScanExecTransformer.scala | 23 ++--
.../org/apache/gluten/sql/shims/SparkShims.scala | 4 +
36 files changed, 319 insertions(+), 538 deletions(-)
diff --git
a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index d2966eafee..29caf77b63 100644
---
a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++
b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.spark.sql.delta.catalog
+
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog,
DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.actions.Metadata
@@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
- filterExprs: Seq[Expression]): Seq[InputPartition] = {
+ filterExprs: Seq[Expression]): Seq[Partition] = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
diff --git
a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index d2966eafee..29caf77b63 100644
---
a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++
b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.spark.sql.delta.catalog
+
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog,
DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.actions.Metadata
@@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
- filterExprs: Seq[Expression]): Seq[InputPartition] = {
+ filterExprs: Seq[Expression]): Seq[Partition] = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
diff --git
a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 2c7a57625b..f16134471c 100644
---
a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.delta.catalog
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -24,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.V1Table
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog,
DeltaTableUtils, DeltaTimeTravelSpec, Snapshot, UnresolvedPathBasedDeltaTable}
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaDataSource
@@ -147,7 +147,7 @@ object ClickHouseTableV2 extends Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
- filterExprs: Seq[Expression]): Seq[InputPartition] = {
+ filterExprs: Seq[Expression]): Seq[Partition] = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
diff --git
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
index e4aff03290..95de2e0414 100644
---
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
+++
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
@@ -292,7 +292,7 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
getExecutedPlan(df).map {
case plan: IcebergScanTransformer =>
assert(plan.getKeyGroupPartitioning.isDefined)
- assert(plan.getSplitInfosWithIndex.length == 3)
+ assert(plan.getSplitInfos.length == 3)
case _ => // do nothing
}
}
@@ -372,7 +372,7 @@ class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite
getExecutedPlan(df).map {
case plan: IcebergScanTransformer =>
assert(plan.getKeyGroupPartitioning.isDefined)
- assert(plan.getSplitInfosWithIndex.length == 3)
+ assert(plan.getSplitInfos.length == 3)
case _ => // do nothing
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 07c3d14409..5fff386d72 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -27,14 +27,13 @@ import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.vectorized.{BatchIterator,
CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator}
-import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
+import org.apache.spark.{InterruptibleIterator, Partition, SparkConf,
TaskContext}
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.CHColumnarShuffleWriter
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer
@@ -125,12 +124,16 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
}
override def genSplitInfo(
- partition: InputPartition,
+ partitionIndex: Int,
+ partitions: Seq[Partition],
partitionSchema: StructType,
dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
+ // todo: support multi partitions
+ assert(partitions.size == 1)
+ val partition = partitions.head
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index 9da8eed202..ddc857182f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -22,12 +22,12 @@ import
org.apache.gluten.execution.{CHHashAggregateExecTransformer, WriteFilesEx
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.{BooleanLiteralNode,
ExpressionBuilder, ExpressionNode}
-import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
+import org.apache.gluten.utils.{CHPartitionsUtil, ExpressionDocUtil}
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.MergeTreeFileFormat
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
@@ -51,8 +51,7 @@ import java.util
class CHTransformerApi extends TransformerApi with Logging {
- /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */
- def genInputPartitionSeq(
+ def genPartitionSeq(
relation: HadoopFsRelation,
requiredSchema: StructType,
selectedPartitions: Array[PartitionDirectory],
@@ -61,7 +60,7 @@ class CHTransformerApi extends TransformerApi with Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
- filterExprs: Seq[Expression]): Seq[InputPartition] = {
+ filterExprs: Seq[Expression]): Seq[Partition] = {
relation.location match {
case index: TahoeFileIndex
if relation.fileFormat
@@ -81,7 +80,7 @@ class CHTransformerApi extends TransformerApi with Logging {
)
case _ =>
// Generate FilePartition for Parquet
- CHInputPartitionsUtil(
+ CHPartitionsUtil(
relation,
requiredSchema,
selectedPartitions,
@@ -89,7 +88,7 @@ class CHTransformerApi extends TransformerApi with Logging {
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
- disableBucketedScan).genInputPartitionSeq()
+ disableBucketedScan).genPartitionSeq()
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
index bdb716c676..fb9fc4b4d8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
@@ -24,8 +24,8 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -53,7 +53,7 @@ case class CHRangeExecTransformer(
}
}
- override def getPartitions: Seq[InputPartition] = {
+ override def getPartitions: Seq[Partition] = {
(0 until numSlices).map {
sliceIndex => GlutenRangeExecPartition(start, end, step, numSlices,
sliceIndex)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
index 46f40fc25b..3cbce8423a 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.spark.Partition
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.types.StructType
@@ -95,7 +96,8 @@ case class GlutenMergeTreePartition(
partList: Array[MergeTreePartSplit],
tableSchema: StructType,
clickhouseTableConfigs: Map[String, String])
- extends InputPartition {
+ extends Partition
+ with InputPartition {
override def preferredLocations(): Array[String] = {
Array.empty[String]
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
deleted file mode 100644
index cf0508d6cb..0000000000
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
CloseableCHColumnBatchIterator}
-
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import java.util
-import java.util.concurrent.TimeUnit.NANOSECONDS
-
-class NativeFileScanColumnarRDD(
- @transient sc: SparkContext,
- @transient private val inputPartitions: Seq[InputPartition],
- numOutputRows: SQLMetric,
- numOutputBatches: SQLMetric,
- scanTime: SQLMetric)
- extends RDD[ColumnarBatch](sc, Nil) {
-
- override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
- val inputPartition = castNativePartition(split)
-
- assert(
- inputPartition.isInstanceOf[GlutenPartition],
- "NativeFileScanColumnarRDD only accepts GlutenPartition.")
-
- val splitInfoByteArray = inputPartition
- .asInstanceOf[GlutenPartition]
- .splitInfos
- .map(splitInfo => splitInfo.toProtobuf.toByteArray)
- .toArray
-
- val resIter = GlutenTimeMetric.millis(scanTime) {
- _ =>
- val inBatchIters = new util.ArrayList[ColumnarNativeIterator]()
- CHNativeExpressionEvaluator.createKernelWithBatchIterator(
- inputPartition.plan,
- splitInfoByteArray,
- inBatchIters,
- false,
- split.index
- )
- }
- TaskContext
- .get()
- .addTaskFailureListener(
- (ctx, _) => {
- if (ctx.isInterrupted()) {
- resIter.cancel()
- }
- })
- TaskContext.get().addTaskCompletionListener[Unit](_ => resIter.close())
- val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
- var scanTotalTime = 0L
- var scanTimeAdded = false
-
- override def hasNext: Boolean = {
- val res = GlutenTimeMetric.withNanoTime(resIter.hasNext)(t =>
scanTotalTime += t)
- if (!res && !scanTimeAdded) {
- scanTime += NANOSECONDS.toMillis(scanTotalTime)
- scanTimeAdded = true
- }
- res
- }
-
- override def next(): ColumnarBatch = {
- GlutenTimeMetric.withNanoTime {
- val cb = resIter.next()
- numOutputRows += cb.numRows()
- numOutputBatches += 1
- cb
- }(t => scanTotalTime += t)
- }
- }
- new CloseableCHColumnBatchIterator(iter)
- }
-
- private def castNativePartition(split: Partition): BaseGlutenPartition =
split match {
- case FirstZippedPartitionsPartition(_, p: BaseGlutenPartition, _) => p
- case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition:
$split")
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- castPartition(split).inputPartition.preferredLocations()
- }
-
- private def castPartition(split: Partition): FirstZippedPartitionsPartition
= split match {
- case p: FirstZippedPartitionsPartition => p
- case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition:
$split")
- }
-
- override protected def getPartitions: Array[Partition] = {
- inputPartitions.zipWithIndex.map {
- case (inputPartition, index) => FirstZippedPartitionsPartition(index,
inputPartition)
- }.toArray
- }
-}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala
similarity index 94%
rename from
backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala
rename to
backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala
index 39f86429e2..6eaab68158 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala
@@ -19,9 +19,9 @@ package org.apache.gluten.utils
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SparkResourceUtil
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer
-case class CHInputPartitionsUtil(
+case class CHPartitionsUtil(
relation: HadoopFsRelation,
requiredSchema: StructType,
selectedPartitions: Array[PartitionDirectory],
@@ -42,15 +42,15 @@ case class CHInputPartitionsUtil(
disableBucketedScan: Boolean)
extends Logging {
- def genInputPartitionSeq(): Seq[InputPartition] = {
+ def genPartitionSeq(): Seq[Partition] = {
if (bucketedScan) {
- genBucketedInputPartitionSeq()
+ genBucketedPartitionSeq()
} else {
- genNonBuckedInputPartitionSeq()
+ genNonBuckedPartitionSeq()
}
}
- private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = {
+ private def genNonBuckedPartitionSeq(): Seq[Partition] = {
val maxSplitBytes =
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
@@ -111,7 +111,7 @@ case class CHInputPartitionsUtil(
}
}
- private def genBucketedInputPartitionSeq(): Seq[InputPartition] = {
+ private def genBucketedPartitionSeq(): Seq[Partition] = {
val bucketSpec = relation.bucketSpec.get
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
index 749de3f9d4..96caf5b790 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
@@ -26,12 +26,12 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.RelBuilder
+import org.apache.spark.Partition
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
@@ -69,7 +69,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
- filterExprs: Seq[Expression]): Seq[InputPartition] = {
+ filterExprs: Seq[Expression]): Seq[Partition] = {
if (
!relation.location.isInstanceOf[TahoeFileIndex] || !relation.fileFormat
.isInstanceOf[DeltaMergeTreeFileFormat]
@@ -82,7 +82,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
val snapshotId =
ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(stalenessAcceptable =
true))
- val partitions = new ArrayBuffer[InputPartition]
+ val partitions = new ArrayBuffer[Partition]
val (database, tableName) = if (table.catalogTable.isDefined) {
(table.catalogTable.get.identifier.database.get,
table.catalogTable.get.identifier.table)
} else {
@@ -95,7 +95,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
// bucket table
if (table.bucketOption.isDefined && bucketedScan) {
- genBucketedInputPartitionSeq(
+ genBucketedPartitionSeq(
engine,
database,
tableName,
@@ -115,7 +115,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
sparkSession
)
} else {
- genInputPartitionSeq(
+ genPartitionSeq(
relation,
engine,
database,
@@ -137,7 +137,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions.toSeq
}
- def genInputPartitionSeq(
+ def genPartitionSeq(
relation: HadoopFsRelation,
engine: String,
database: String,
@@ -148,7 +148,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
optionalBucketSet: Option[BitSet],
selectedPartitions: Array[PartitionDirectory],
tableSchema: StructType,
- partitions: ArrayBuffer[InputPartition],
+ partitions: ArrayBuffer[Partition],
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
output: Seq[Attribute],
@@ -316,7 +316,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
relativeTablePath: String,
absoluteTablePath: String,
tableSchema: StructType,
- partitions: ArrayBuffer[InputPartition],
+ partitions: ArrayBuffer[Partition],
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
splitFiles: Seq[MergeTreePartSplit],
@@ -372,7 +372,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
relativeTablePath: String,
absoluteTablePath: String,
tableSchema: StructType,
- partitions: ArrayBuffer[InputPartition],
+ partitions: ArrayBuffer[Partition],
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
splitFiles: Seq[MergeTreePartSplit],
@@ -441,7 +441,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
}
/** Generate bucket partition */
- def genBucketedInputPartitionSeq(
+ def genBucketedPartitionSeq(
engine: String,
database: String,
tableName: String,
@@ -453,7 +453,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
optionalNumCoalescedBuckets: Option[Int],
selectedPartitions: Array[PartitionDirectory],
tableSchema: StructType,
- partitions: ArrayBuffer[InputPartition],
+ partitions: ArrayBuffer[Partition],
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
output: Seq[Attribute],
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
index 8a8c2e321c..fe1ca1e6f6 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
@@ -19,8 +19,7 @@ package org.apache.gluten.execution.mergetree
import org.apache.gluten.affinity.{CHUTAffinity, CHUTSoftAffinityManager}
import
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite,
GlutenMergeTreePartition, MergeTreePartSplit}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.{Partition, SparkConf}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import
org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.types.StructType
@@ -61,7 +60,7 @@ class GlutenClickhouseMergetreeSoftAffinitySuite
test("Soft Affinity Scheduler with duplicate reading detection") {
- val partitions: ArrayBuffer[InputPartition] = new
ArrayBuffer[InputPartition]()
+ val partitions: ArrayBuffer[Partition] = new ArrayBuffer[Partition]()
var splitFiles: Seq[MergeTreePartSplit] = Seq()
val relativeTablePath = "tmp/"
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 4afad58c9a..9bb670f10e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -28,12 +28,11 @@ import org.apache.gluten.substrait.rel.{LocalFilesBuilder,
LocalFilesNode, Split
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.vectorized._
-import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.{Partition, SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
@@ -66,67 +65,21 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
override def genSplitInfo(
- partition: InputPartition,
- partitionSchema: StructType,
- dataSchema: StructType,
- fileFormat: ReadFileFormat,
- metadataColumnNames: Seq[String],
- properties: Map[String, String]): SplitInfo = {
- partition match {
- case f: FilePartition =>
- val (
- paths,
- starts,
- lengths,
- fileSizes,
- modificationTimes,
- partitionColumns,
- metadataColumns,
- otherMetadataColumns) =
- constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
- val preferredLocations =
- SoftAffinity.getFilePartitionLocations(f)
- setFileSchemaForLocalFiles(
- LocalFilesBuilder.makeLocalFiles(
- f.index,
- paths,
- starts,
- lengths,
- fileSizes,
- modificationTimes,
- partitionColumns,
- metadataColumns,
- fileFormat,
- preferredLocations.toList.asJava,
- mapAsJavaMap(properties),
- otherMetadataColumns
- ),
- dataSchema,
- fileFormat
- )
- case _ =>
- throw new UnsupportedOperationException(s"Unsupported input
partition.")
- }
- }
-
- override def genSplitInfoForPartitions(
partitionIndex: Int,
- partitions: Seq[InputPartition],
+ partitions: Seq[Partition],
partitionSchema: StructType,
dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
- val partitionFiles = partitions.flatMap {
- p =>
- if (!p.isInstanceOf[FilePartition]) {
- throw new UnsupportedOperationException(
- s"Unsupported input partition ${p.getClass.getName}.")
- }
- p.asInstanceOf[FilePartition].files
- }.toArray
- val locations =
- partitions.flatMap(p =>
SoftAffinity.getFilePartitionLocations(p.asInstanceOf[FilePartition]))
+ val filePartitions: Seq[FilePartition] = partitions.map {
+ case p: FilePartition => p
+ case o =>
+ throw new UnsupportedOperationException(
+ s"Unsupported input partition: ${o.getClass.getName}")
+ }
+ val partitionFiles = filePartitions.flatMap(_.files).toArray
+ val locations = filePartitions.flatMap(p =>
SoftAffinity.getFilePartitionLocations(p))
val (
paths,
starts,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
index 76a1a0e3a7..5222265bea 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
@@ -25,12 +25,12 @@ import org.apache.gluten.proto.ConfigMap
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode}
-import org.apache.gluten.utils.InputPartitionsUtil
+import org.apache.gluten.utils.PartitionsUtil
import org.apache.gluten.vectorized.PlanEvaluatorJniWrapper
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.execution.HiveFileFormat
@@ -44,8 +44,7 @@ import java.util.{Map => JMap}
class VeloxTransformerApi extends TransformerApi with Logging {
- /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */
- def genInputPartitionSeq(
+ def genPartitionSeq(
relation: HadoopFsRelation,
requiredSchema: StructType,
selectedPartitions: Array[PartitionDirectory],
@@ -54,8 +53,8 @@ class VeloxTransformerApi extends TransformerApi with Logging
{
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
- filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition] = {
- InputPartitionsUtil(
+ filterExprs: Seq[Expression] = Seq.empty): Seq[Partition] = {
+ PartitionsUtil(
relation,
requiredSchema,
selectedPartitions,
@@ -64,7 +63,7 @@ class VeloxTransformerApi extends TransformerApi with Logging
{
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan)
- .genInputPartitionSeq()
+ .genPartitionSeq()
}
override def postProcessNativeConfig(
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 4cdc51fe2a..7bfa522a48 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -17,16 +17,18 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.exception.GlutenNotSupportException
import
org.apache.gluten.execution.IcebergScanTransformer.{containsMetadataColumn,
containsUuidOrFixedType}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.{LocalFilesNode, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
DynamicPruningExpression, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
@@ -137,30 +139,11 @@ case class IcebergScanTransformer(
override lazy val fileFormat: ReadFileFormat =
GlutenIcebergSourceUtil.getFileFormat(scan)
- override def getSplitInfosWithIndex: Seq[SplitInfo] = {
- val splitInfos = getPartitionsWithIndex.zipWithIndex.map {
- case (partitions, index) =>
- GlutenIcebergSourceUtil.genSplitInfo(partitions, index,
getPartitionSchema)
- }
- numSplits.add(splitInfos.map(s =>
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
- splitInfos
- }
-
- override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
- val groupedPartitions = SparkShimLoader.getSparkShims
- .orderPartitions(
- this,
- scan,
- keyGroupedPartitioning,
- filteredPartitions,
- outputPartitioning,
- commonPartitionValues,
- applyPartialClustering,
- replicatePartitions)
- .flatten
- val splitInfos = groupedPartitions.zipWithIndex.map {
- case (p, index) =>
- GlutenIcebergSourceUtil.genSplitInfoForPartition(p, index,
getPartitionSchema)
+ override def getSplitInfosFromPartitions(partitions: Seq[Partition]):
Seq[SplitInfo] = {
+ val splitInfos = partitions.map {
+ case p: SparkDataSourceRDDPartition =>
+ GlutenIcebergSourceUtil.genSplitInfo(p, getPartitionSchema)
+ case _ => throw new GlutenNotSupportException()
}
numSplits.add(splitInfos.map(s =>
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
splitInfos
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 3816499a97..2436166e92 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -18,12 +18,13 @@ package org.apache.iceberg.spark.source
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
+import org.apache.gluten.execution.SparkDataSourceRDDPartition
import org.apache.gluten.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.types.StructType
import org.apache.iceberg._
@@ -36,67 +37,17 @@ import scala.collection.JavaConverters._
object GlutenIcebergSourceUtil {
- def genSplitInfoForPartition(
- inputPartition: InputPartition,
- index: Int,
- readPartitionSchema: StructType): SplitInfo = inputPartition match {
- case partition: SparkInputPartition =>
- val paths = new JArrayList[String]()
- val starts = new JArrayList[JLong]()
- val lengths = new JArrayList[JLong]()
- val partitionColumns = new JArrayList[JMap[String, String]]()
- val deleteFilesList = new JArrayList[JList[DeleteFile]]()
- var fileFormat = ReadFileFormat.UnknownFormat
-
- val tasks = partition.taskGroup[ScanTask]().tasks().asScala
- asFileScanTask(tasks.toList).foreach {
- task =>
- paths.add(
- BackendsApiManager.getTransformerApiInstance
- .encodeFilePathIfNeed(task.file().path().toString))
- starts.add(task.start())
- lengths.add(task.length())
- partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
- deleteFilesList.add(task.deletes())
- val currentFileFormat = convertFileFormat(task.file().format())
- if (fileFormat == ReadFileFormat.UnknownFormat) {
- fileFormat = currentFileFormat
- } else if (fileFormat != currentFileFormat) {
- throw new UnsupportedOperationException(
- s"Only one file format is supported, " +
- s"find different file format $fileFormat and
$currentFileFormat")
- }
- }
- val preferredLoc = SoftAffinity.getFilePartitionLocations(
- paths.asScala.toArray,
- inputPartition.preferredLocations())
- IcebergLocalFilesBuilder.makeIcebergLocalFiles(
- index,
- paths,
- starts,
- lengths,
- partitionColumns,
- fileFormat,
- preferredLoc.toList.asJava,
- deleteFilesList
- )
- case _ =>
- throw new UnsupportedOperationException("Only support iceberg
SparkInputPartition.")
- }
-
def genSplitInfo(
- inputPartitions: Seq[InputPartition],
- index: Int,
+ partition: SparkDataSourceRDDPartition,
readPartitionSchema: StructType): SplitInfo = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
- val preferredLocs = new JArrayList[String]()
var fileFormat = ReadFileFormat.UnknownFormat
- inputPartitions.foreach {
+ partition.inputPartitions.foreach {
case partition: SparkInputPartition =>
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
asFileScanTask(tasks.toList).foreach {
@@ -117,17 +68,18 @@ object GlutenIcebergSourceUtil {
s"find different file format $fileFormat and
$currentFileFormat")
}
}
- preferredLocs.addAll(partition.preferredLocations().toList.asJava)
+ case o =>
+ throw new GlutenNotSupportException(s"Unsupported input partition
type: $o")
}
IcebergLocalFilesBuilder.makeIcebergLocalFiles(
- index,
+ partition.index,
paths,
starts,
lengths,
partitionColumns,
fileFormat,
SoftAffinity
- .getFilePartitionLocations(paths.asScala.toArray,
preferredLocs.asScala.toArray)
+ .getFilePartitionLocations(paths.asScala.toArray,
partition.preferredLocations())
.toList
.asJava,
deleteFilesList
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
index 296db20829..890c408658 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
@@ -271,7 +271,7 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
getExecutedPlan(df).map {
case plan: IcebergScanTransformer =>
assert(plan.getKeyGroupPartitioning.isDefined)
- assert(plan.getSplitInfosWithIndex.length == 3)
+ assert(plan.getSplitInfos.length == 3)
case _ => // do nothing
}
}
@@ -348,7 +348,7 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
getExecutedPlan(df).map {
case plan: IcebergScanTransformer =>
assert(plan.getKeyGroupPartitioning.isDefined)
- assert(plan.getSplitInfosWithIndex.length == 3)
+ assert(plan.getSplitInfos.length == 3)
case _ => // do nothing
}
}
diff --git
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
index f644322764..2ab458f6f7 100644
---
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
+++
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.rel.{ReadRelNode, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import org.apache.spark.sql.connector.catalog.Table
@@ -67,7 +68,10 @@ case class MicroBatchScanExecTransformer(
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
- override def getPartitions: Seq[InputPartition] = inputPartitionsShim
+ // todo: consider grouped partitions
+ override def getPartitions: Seq[Partition] =
inputPartitionsShim.zipWithIndex.map {
+ case (inputPartition, index) => new SparkDataSourceRDDPartition(index,
Seq(inputPartition))
+ }
/** Returns the actual schema of this data source scan. */
override def getDataSchema: StructType = scan.readSchema()
@@ -80,7 +84,7 @@ case class MicroBatchScanExecTransformer(
MicroBatchScanExecTransformer.supportsBatchScan(scan)
}
- override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
+ override def getSplitInfosFromPartitions(partitions: Seq[Partition]):
Seq[SplitInfo] = {
val groupedPartitions = filteredPartitions.flatten
groupedPartitions.zipWithIndex.map {
case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p)
diff --git
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
index 31eb197091..a4911c8263 100644
---
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
+++
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.{PaimonLocalFilesBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.InternalRow
@@ -28,7 +29,7 @@ import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
DynamicPruningExpression, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -119,16 +120,17 @@ case class PaimonScanTransformer(
override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new
UnsupportedOperationException()
- override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
+ override def getSplitInfosFromPartitions(partitions: Seq[Partition]):
Seq[SplitInfo] = {
val partitionComputer =
PaimonScanTransformer.getRowDataPartitionComputer(scan)
- getPartitions.zipWithIndex.map {
- case (p, index) =>
- p match {
+ partitions.map {
+ case p: SparkDataSourceRDDPartition =>
+ val paths = mutable.ListBuffer.empty[String]
+ val starts = mutable.ListBuffer.empty[JLong]
+ val lengths = mutable.ListBuffer.empty[JLong]
+ val partitionColumns = mutable.ListBuffer.empty[JMap[String, String]]
+
+ p.inputPartitions.foreach {
case partition: PaimonInputPartition =>
- val paths = mutable.ListBuffer.empty[String]
- val starts = mutable.ListBuffer.empty[JLong]
- val lengths = mutable.ListBuffer.empty[JLong]
- val partitionColumns = mutable.ListBuffer.empty[JMap[String,
String]]
partition.splits.foreach {
split =>
val rawFilesOpt = split.convertToRawFiles()
@@ -145,21 +147,24 @@ case class PaimonScanTransformer(
"Cannot get raw files from paimon SparkInputPartition.")
}
}
- val preferredLoc =
- SoftAffinity.getFilePartitionLocations(paths.toArray,
partition.preferredLocations())
- PaimonLocalFilesBuilder.makePaimonLocalFiles(
- index,
- paths.asJava,
- starts.asJava,
- lengths.asJava,
- partitionColumns.asJava,
- fileFormat,
- preferredLoc.toList.asJava,
- new JHashMap[String, String]()
- )
- case _ =>
- throw new GlutenNotSupportException("Only support paimon
SparkInputPartition.")
+ case o =>
+ throw new GlutenNotSupportException(s"Unsupported input partition
type: $o")
}
+
+ PaimonLocalFilesBuilder.makePaimonLocalFiles(
+ p.index,
+ paths.asJava,
+ starts.asJava,
+ lengths.asJava,
+ partitionColumns.asJava,
+ fileFormat,
+ SoftAffinity
+ .getFilePartitionLocations(paths.toArray, p.preferredLocations())
+ .toList
+ .asJava,
+ new JHashMap[String, String]()
+ )
+ case _ => throw new GlutenNotSupportException()
}
}
diff --git
a/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
index 91493283a0..fddac2123d 100644
---
a/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
+++
b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
@@ -94,4 +94,21 @@ abstract class PaimonSuite extends
WholeStageTransformerSuite {
}
}
}
+
+ test("paimon transformer exists with bucket table") {
+ withTable(s"paimon_tbl") {
+ sql(s"""
+ |CREATE TABLE paimon_tbl (id INT, name STRING)
+ |USING paimon
+ |TBLPROPERTIES (
+ | 'bucket' = '1',
+ | 'bucket-key' = 'id'
+ |)
+ |""".stripMargin)
+ sql(s"INSERT INTO paimon_tbl VALUES (1, 'Bob'), (2, 'Blue'), (3,
'Mike')")
+ runQueryAndCompare("SELECT * FROM paimon_tbl") {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ }
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 34f7f41b78..52be6c4954 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -23,7 +23,6 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.SplitInfo
import org.apache.spark._
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
@@ -32,21 +31,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
trait IteratorApi {
def genSplitInfo(
- partition: InputPartition,
- partitionSchema: StructType,
- dataSchema: StructType,
- fileFormat: ReadFileFormat,
- metadataColumnNames: Seq[String],
- properties: Map[String, String]): SplitInfo
-
- def genSplitInfoForPartitions(
partitionIndex: Int,
- partition: Seq[InputPartition],
+ partition: Seq[Partition],
partitionSchema: StructType,
dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
- properties: Map[String, String]): SplitInfo = throw new
UnsupportedOperationException()
+ properties: Map[String, String]): SplitInfo
/** Generate native row partition. */
def genPartitions(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
index 92d6ebd325..a855537664 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
@@ -20,8 +20,8 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.ExpressionNode
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
import org.apache.spark.sql.types.{DataType, DecimalType, StructType}
@@ -33,8 +33,8 @@ import java.util
trait TransformerApi {
- /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */
- def genInputPartitionSeq(
+ /** Generate Seq[Partition] for FileSourceScanExecTransformer. */
+ def genPartitionSeq(
relation: HadoopFsRelation,
requiredSchema: StructType,
selectedPartitions: Array[PartitionDirectory],
@@ -43,7 +43,7 @@ trait TransformerApi {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
- filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition]
+ filterExprs: Seq[Expression] = Seq.empty): Seq[Partition]
/**
* Post-process native config, For example, for ClickHouse backend, sync
'spark.executor.cores' to
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index a8524d53f2..86b76e7a25 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -25,8 +25,8 @@ import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import com.google.protobuf.StringValue
@@ -56,21 +56,27 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
/** Returns the file format properties. */
def getProperties: Map[String, String] = Map.empty
- /** Returns the split infos that will be processed by the underlying native
engine. */
override def getSplitInfos: Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions)
}
- def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
+ def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo]
= {
partitions.map(
- BackendsApiManager.getIteratorApiInstance
- .genSplitInfo(
- _,
- getPartitionSchema,
- getDataSchema,
- fileFormat,
- getMetadataColumns().map(_.name),
- getProperties))
+ p => {
+ val ps = p match {
+ case sp: SparkDataSourceRDDPartition =>
sp.inputPartitions.map(_.asInstanceOf[Partition])
+ case o => Seq(o)
+ }
+ BackendsApiManager.getIteratorApiInstance
+ .genSplitInfo(
+ p.index,
+ ps,
+ getPartitionSchema,
+ getDataSchema,
+ fileFormat,
+ getMetadataColumns().map(_.name),
+ getProperties)
+ })
}
override protected def doValidateInternal(): ValidationResult = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 03a840cccd..0b0dc7c52f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -21,15 +21,15 @@ import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.gluten.substrait.rel.SplitInfo
import org.apache.gluten.utils.FileIndexUtil
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim,
FileScan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
@@ -128,26 +128,7 @@ abstract class BatchScanExecTransformerBase(
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
- // With storage partition join, the return partition type is changed, so as
SplitInfo
- def getPartitionsWithIndex: Seq[Seq[InputPartition]] = finalPartitions
-
- def getSplitInfosWithIndex: Seq[SplitInfo] = {
- getPartitionsWithIndex.zipWithIndex.map {
- case (partitions, index) =>
- BackendsApiManager.getIteratorApiInstance
- .genSplitInfoForPartitions(
- index,
- partitions,
- getPartitionSchema,
- getDataSchema,
- fileFormat,
- getMetadataColumns().map(_.name),
- getProperties)
- }
- }
-
- // May cannot call for bucket scan
- override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions
+ def getPartitions: Seq[Partition] = finalPartitions
override def getPartitionSchema: StructType = scan match {
case fileScan: FileScan => fileScan.readPartitionSchema
@@ -195,19 +176,21 @@ abstract class BatchScanExecTransformerBase(
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetricsUpdater(metrics)
- @transient protected lazy val filteredFlattenPartitions: Seq[InputPartition]
=
- filteredPartitions.flatten
-
- @transient protected lazy val finalPartitions: Seq[Seq[InputPartition]] =
- SparkShimLoader.getSparkShims.orderPartitions(
- this,
- scan,
- keyGroupedPartitioning,
- filteredPartitions,
- outputPartitioning,
- commonPartitionValues,
- applyPartialClustering,
- replicatePartitions)
+ @transient protected lazy val finalPartitions: Seq[Partition] =
+ SparkShimLoader.getSparkShims
+ .orderPartitions(
+ this,
+ scan,
+ keyGroupedPartitioning,
+ filteredPartitions,
+ outputPartitioning,
+ commonPartitionValues,
+ applyPartialClustering,
+ replicatePartitions)
+ .zipWithIndex
+ .map {
+ case (inputPartitions, index) => new
SparkDataSourceRDDPartition(index, inputPartitions)
+ }
@transient override lazy val fileFormat: ReadFileFormat =
BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 61c94747bc..0584922bca 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -23,11 +23,11 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.FileIndexUtil
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
@@ -123,18 +123,19 @@ abstract class FileSourceScanExecTransformerBase(
override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
- override def getPartitions: Seq[InputPartition] = {
- BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
- relation,
- requiredSchema,
- getPartitionArray(),
- output,
- bucketedScan,
- optionalBucketSet,
- optionalNumCoalescedBuckets,
- disableBucketedScan,
- filterExprs()
- )
+ override def getPartitions: Seq[Partition] = {
+ BackendsApiManager.getTransformerApiInstance
+ .genPartitionSeq(
+ relation,
+ requiredSchema,
+ getPartitionArray(),
+ output,
+ bucketedScan,
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ disableBucketedScan,
+ filterExprs()
+ )
}
override def getPartitionSchema: StructType = relation.partitionSchema
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index 263d56720c..4a00dbb587 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -47,13 +47,13 @@ case class GlutenPartition(
case class FirstZippedPartitionsPartition(
index: Int,
- inputPartition: InputPartition,
+ inputPartition: Partition,
inputColumnarRDDPartitions: Seq[Partition] = Seq.empty)
extends Partition
class GlutenWholeStageColumnarRDD(
@transient sc: SparkContext,
- @transient private val inputPartitions: Seq[InputPartition],
+ @transient private val inputPartitions: Seq[Partition],
var rdds: ColumnarInputRDDsWrapper,
pipelineTime: SQLMetric,
updateInputMetrics: InputMetricsWrapper => Unit,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala
new file mode 100644
index 0000000000..e3313c9cd2
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.Partition
+import org.apache.spark.sql.connector.read.InputPartition
+
+/**
+ * Copy from spark's
[[org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition]] to
+ * make compatible with spark3.3 and before.
+ */
+class SparkDataSourceRDDPartition(val index: Int, val inputPartitions:
Seq[InputPartition])
+ extends Partition
+ with Serializable
+ with InputPartition {
+
+ override def preferredLocations(): Array[String] = {
+ inputPartitions.flatMap(_.preferredLocations()).toArray
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index f87b9673cc..264195f93f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.exception.GlutenException
import org.apache.gluten.expression._
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater}
@@ -35,7 +34,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
@@ -136,10 +134,12 @@ trait TransformSupport extends ValidatablePlan {
trait LeafTransformSupport extends TransformSupport with LeafExecNode {
final override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = Seq.empty
+
+ /** Returns the split infos that will be processed by the underlying native
engine. */
def getSplitInfos: Seq[SplitInfo]
/** Returns the partitions generated by this data source scan. */
- def getPartitions: Seq[InputPartition]
+ def getPartitions: Seq[Partition]
}
trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {
@@ -293,32 +293,38 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
allLeafTransformers.toSeq
}
+ /**
+ * If containing leaf exec transformer this "whole stage" generates a RDD
which itself takes care
+ * of [[LeafTransformSupport]] there won't be any other RDD for leaf
operator. As a result,
+ * genFirstStageIterator rather than genFinalStageIterator will be invoked
+ */
private def generateWholeStageRDD(
leafTransformers: Seq[LeafTransformSupport],
wsCtx: WholeStageTransformContext,
inputRDDs: ColumnarInputRDDsWrapper,
pipelineTime: SQLMetric): RDD[ColumnarBatch] = {
- val isKeyGroupPartition = leafTransformers.exists {
- // TODO: May can apply to BatchScanExecTransformer without key group
partitioning
- case b: BatchScanExecTransformerBase if
b.keyGroupedPartitioning.isDefined => true
- case _ => false
- }
- /**
- * If containing leaf exec transformer this "whole stage" generates a RDD
which itself takes
- * care of [[LeafTransformSupport]] there won't be any other RDD for leaf
operator. As a result,
- * genFirstStageIterator rather than genFinalStageIterator will be invoked
- */
- val allInputPartitions = leafTransformers.map(
- leafTransformer => {
- if (isKeyGroupPartition) {
-
leafTransformer.asInstanceOf[BatchScanExecTransformerBase].getPartitionsWithIndex
- } else {
- Seq(leafTransformer.getPartitions)
- }
- })
-
- val allSplitInfos = getSplitInfosFromPartitions(isKeyGroupPartition,
leafTransformers)
+ // If these are two leaf transformers, they must have same partitions,
+ // otherwise, exchange will be inserted. We should combine the two leaf
+ // transformers' partitions with same index, and set them together in
+ // the substraitContext.
+ // We use transpose to do that, You can refer to
+ // the diagram below.
+ // leaf1 p11 p12 p13 p14 ... p1n
+ // leaf2 p21 p22 p23 p24 ... p2n
+ // transpose =>
+ // leaf1 | leaf2
+ // p11 | p21 => substraitContext.setSplitInfo([p11, p21])
+ // p12 | p22 => substraitContext.setSplitInfo([p12, p22])
+ // p13 | p23 ...
+ // p14 | p24
+ // ...
+ // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n])
+ // The data in partition may be empty, for example,
+ // if these are two batch scan transformer with keyGroupPartitioning,
+ // they have same partitionValues,
+ // but some partitions maybe empty for hose partition values that are not
present.
+ val allSplitInfos = leafTransformers.map(_.getSplitInfos).transpose
if (GlutenConfig.get.enableHdfsViewfs) {
val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
@@ -356,6 +362,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
wsCtx.enableCudf
)
+ val allInputPartitions = leafTransformers.map(_.getPartitions)
SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
leafTransformers.foreach {
@@ -367,55 +374,6 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
rdd
}
- private def getSplitInfosFromPartitions(
- isKeyGroupPartition: Boolean,
- leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = {
- val allSplitInfos = if (isKeyGroupPartition) {
- // If these are two batch scan transformer with keyGroupPartitioning,
- // they have same partitionValues,
- // but some partitions maybe empty for those partition values that are
not present,
- // otherwise, exchange will be inserted. We should combine the two leaf
- // transformers' partitions with same index, and set them together in
- // the substraitContext. We use transpose to do that, You can refer to
- // the diagram below.
- // leaf1 Seq(p11) Seq(p12, p13) Seq(p14) ... Seq(p1n)
- // leaf2 Seq(p21) Seq(p22) Seq() ... Seq(p2n)
- // transpose =>
- // leaf1 | leaf2
- // Seq(p11) | Seq(p21) =>
substraitContext.setSplitInfo([Seq(p11), Seq(p21)])
- // Seq(p12, p13) | Seq(p22) =>
substraitContext.setSplitInfo([Seq(p12, p13), Seq(p22)])
- // Seq(p14) | Seq() ...
- // ...
- // Seq(p1n) | Seq(p2n) =>
substraitContext.setSplitInfo([Seq(p1n), Seq(p2n)])
-
leafTransformers.map(_.asInstanceOf[BatchScanExecTransformerBase].getSplitInfosWithIndex)
- } else {
- // If these are two leaf transformers, they must have same partitions,
- // otherwise, exchange will be inserted. We should combine the two leaf
- // transformers' partitions with same index, and set them together in
- // the substraitContext. We use transpose to do that, You can refer to
- // the diagram below.
- // leaf1 p11 p12 p13 p14 ... p1n
- // leaf2 p21 p22 p23 p24 ... p2n
- // transpose =>
- // leaf1 | leaf2
- // p11 | p21 => substraitContext.setSplitInfo([p11, p21])
- // p12 | p22 => substraitContext.setSplitInfo([p12, p22])
- // p13 | p23 ...
- // p14 | p24
- // ...
- // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n])
- leafTransformers.map(_.getSplitInfos)
- }
-
- val partitionLength = allSplitInfos.head.size
- if (allSplitInfos.exists(_.size != partitionLength)) {
- throw new GlutenException(
- "The partition length of all the leaf transformer are not the same.")
- }
-
- allSplitInfos.transpose
- }
-
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
assert(child.isInstanceOf[TransformSupport])
val pipelineTime: SQLMetric = longMetric("pipelineTime")
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
similarity index 93%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
index b05992fa30..953f95bf23 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
@@ -18,16 +18,16 @@ package org.apache.gluten.utils
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FilePartition, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
import org.apache.hadoop.fs.Path
-case class InputPartitionsUtil(
+case class PartitionsUtil(
relation: HadoopFsRelation,
requiredSchema: StructType,
selectedPartitions: Array[PartitionDirectory],
@@ -38,15 +38,15 @@ case class InputPartitionsUtil(
disableBucketedScan: Boolean)
extends Logging {
- def genInputPartitionSeq(): Seq[InputPartition] = {
+ def genPartitionSeq(): Seq[Partition] = {
if (bucketedScan) {
- genBucketedInputPartitionSeq()
+ genBucketedPartitionSeq()
} else {
- genNonBuckedInputPartitionSeq()
+ genNonBuckedPartitionSeq()
}
}
- private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = {
+ private def genNonBuckedPartitionSeq(): Seq[Partition] = {
val openCostInBytes =
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
val maxSplitBytes =
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
@@ -99,7 +99,7 @@ case class InputPartitionsUtil(
FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
}
- private def genBucketedInputPartitionSeq(): Seq[InputPartition] = {
+ private def genBucketedPartitionSeq(): Seq[Partition] = {
val bucketSpec = relation.bucketSpec.get
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
index 7ae47e422b..714e632d08 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -20,9 +20,9 @@ import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
+import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil
with Logging {
@@ -80,15 +80,13 @@ abstract class Affinity(val manager: AffinityManager)
extends LogLevelUtil with
}
/** Update the RDD id to SoftAffinityManager */
- def updateFilePartitionLocations(
- inputPartitions: Seq[Seq[Seq[InputPartition]]],
- rddId: Int): Unit = {
+ def updateFilePartitionLocations(inputPartitions: Seq[Seq[Partition]],
rddId: Int): Unit = {
if (SoftAffinityManager.usingSoftAffinity &&
SoftAffinityManager.detectDuplicateReading) {
- inputPartitions.foreach(_.foreach(_.foreach {
+ inputPartitions.foreach(_.foreach {
case f: FilePartition =>
SoftAffinityManager.updatePartitionMap(f, rddId)
case _ =>
- }))
+ })
}
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index d83edfb22d..39500a50e7 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -21,11 +21,11 @@ import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.spark.Partition
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSeq, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.hive.HiveTableScanExecTransformer._
@@ -67,7 +67,7 @@ case class HiveTableScanExecTransformer(
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
- override def getPartitions: Seq[InputPartition] = partitions
+ override def getPartitions: Seq[Partition] = partitions
override def getPartitionSchema: StructType =
relation.tableMeta.partitionSchema
@@ -82,7 +82,7 @@ case class HiveTableScanExecTransformer(
@transient private lazy val hivePartitionConverter =
new HivePartitionConverter(session.sessionState.newHadoopConf(), session)
- @transient private lazy val partitions: Seq[InputPartition] =
+ @transient private lazy val partitions: Seq[Partition] =
if (!relation.isPartitioned) {
val tableLocation: URI =
relation.tableMeta.storage.locationUri.getOrElse {
throw new UnsupportedOperationException("Table path not set.")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 8b5efa9167..339bda2a03 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -49,16 +49,17 @@ case class TestFileSourceScanExecTransformer(
tableIdentifier,
disableBucketedScan) {
- override def getPartitions: Seq[InputPartition] =
- BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
- relation,
- requiredSchema,
- selectedPartitions,
- output,
- bucketedScan,
- optionalBucketSet,
- optionalNumCoalescedBuckets,
- disableBucketedScan)
+ override def getPartitions: Seq[Partition] =
+ BackendsApiManager.getTransformerApiInstance
+ .genPartitionSeq(
+ relation,
+ requiredSchema,
+ selectedPartitions,
+ output,
+ bucketedScan,
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ disableBucketedScan)
override val nodeNamePrefix: String = "TestFile"
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 98ef4eff78..6846f140ed 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer(
dataFilters,
tableIdentifier,
disableBucketedScan) {
- override def getPartitions: Seq[InputPartition] =
- BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
- relation,
- requiredSchema,
- selectedPartitions,
- output,
- bucketedScan,
- optionalBucketSet,
- optionalNumCoalescedBuckets,
- disableBucketedScan)
+ override def getPartitions: Seq[Partition] =
+ BackendsApiManager.getTransformerApiInstance
+ .genPartitionSeq(
+ relation,
+ requiredSchema,
+ selectedPartitions,
+ output,
+ bucketedScan,
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ disableBucketedScan)
override val nodeNamePrefix: String = "TestFile"
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 98ef4eff78..6846f140ed 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer(
dataFilters,
tableIdentifier,
disableBucketedScan) {
- override def getPartitions: Seq[InputPartition] =
- BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
- relation,
- requiredSchema,
- selectedPartitions,
- output,
- bucketedScan,
- optionalBucketSet,
- optionalNumCoalescedBuckets,
- disableBucketedScan)
+ override def getPartitions: Seq[Partition] =
+ BackendsApiManager.getTransformerApiInstance
+ .genPartitionSeq(
+ relation,
+ requiredSchema,
+ selectedPartitions,
+ output,
+ bucketedScan,
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ disableBucketedScan)
override val nodeNamePrefix: String = "TestFile"
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 98ef4eff78..6846f140ed 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
@@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer(
dataFilters,
tableIdentifier,
disableBucketedScan) {
- override def getPartitions: Seq[InputPartition] =
- BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
- relation,
- requiredSchema,
- selectedPartitions,
- output,
- bucketedScan,
- optionalBucketSet,
- optionalNumCoalescedBuckets,
- disableBucketedScan)
+ override def getPartitions: Seq[Partition] =
+ BackendsApiManager.getTransformerApiInstance
+ .genPartitionSeq(
+ relation,
+ requiredSchema,
+ selectedPartitions,
+ output,
+ bucketedScan,
+ optionalBucketSet,
+ optionalNumCoalescedBuckets,
+ disableBucketedScan)
override val nodeNamePrefix: String = "TestFile"
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 5f62c272f7..bbbd665cd8 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -243,6 +243,10 @@ trait SparkShims {
def getCommonPartitionValues(batchScan: BatchScanExec):
Option[Seq[(InternalRow, Int)]] =
Option(Seq())
+ /**
+ * Most of the code in this method is copied from
+ * [[org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD]].
+ */
def orderPartitions(
batchScan: DataSourceV2ScanExecBase,
scan: Scan,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]