This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7951720063 [VL] Refactor WholeStageTransformer to remove some
duplicate code (#9388)
7951720063 is described below
commit 7951720063eb794464f83d0d19899b9befeda69d
Author: wypb <[email protected]>
AuthorDate: Thu Apr 24 21:50:32 2025 +0800
[VL] Refactor WholeStageTransformer to remove some duplicate code (#9388)
---
.../gluten/execution/WholeStageTransformer.scala | 171 +++++++--------------
1 file changed, 57 insertions(+), 114 deletions(-)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 745ad5966c..bdf425f78d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -372,14 +372,27 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
wsCtx: WholeStageTransformContext,
inputRDDs: ColumnarInputRDDsWrapper,
pipelineTime: SQLMetric): RDD[ColumnarBatch] = {
+ val isKeyGroupPartition = leafTransformers.exists {
+ // TODO: May can apply to BatchScanExecTransformer without key group
partitioning
+ case b: BatchScanExecTransformerBase if
b.keyGroupedPartitioning.isDefined => true
+ case _ => false
+ }
/**
* If containing leaf exec transformer this "whole stage" generates a RDD
which itself takes
* care of [[LeafTransformSupport]] there won't be any other RDD for leaf
operator. As a result,
* genFirstStageIterator rather than genFinalStageIterator will be invoked
*/
- val allInputPartitions = leafTransformers.map(_.getPartitions)
- val allSplitInfos = getSplitInfosFromPartitions(leafTransformers)
+ val allInputPartitions = leafTransformers.map(
+ leafTransformer => {
+ if (isKeyGroupPartition) {
+
leafTransformer.asInstanceOf[BatchScanExecTransformerBase].getPartitionsWithIndex
+ } else {
+ Seq(leafTransformer.getPartitions)
+ }
+ })
+
+ val allSplitInfos = getSplitInfosFromPartitions(isKeyGroupPartition,
leafTransformers)
if (GlutenConfig.get.enableHdfsViewfs) {
val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
@@ -416,90 +429,60 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
)
)
- SoftAffinity.updateFilePartitionLocations(Seq(allInputPartitions), rdd.id)
+ SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
rdd
}
- private def getSplitInfosFromPartitionSeqs(
- leafTransformers: Seq[BatchScanExecTransformerBase]):
Seq[Seq[SplitInfo]] = {
- // If these are two batch scan transformer with keyGroupPartitioning,
- // they have same partitionValues,
- // but some partitions maybe empty for those partition values that are not
present,
- // otherwise, exchange will be inserted. We should combine the two leaf
- // transformers' partitions with same index, and set them together in
- // the substraitContext. We use transpose to do that, You can refer to
- // the diagram below.
- // leaf1 Seq(p11) Seq(p12, p13) Seq(p14) ... Seq(p1n)
- // leaf2 Seq(p21) Seq(p22) Seq() ... Seq(p2n)
- // transpose =>
- // leaf1 | leaf2
- // Seq(p11) | Seq(p21) =>
substraitContext.setSplitInfo([Seq(p11), Seq(p21)])
- // Seq(p12, p13) | Seq(p22) =>
substraitContext.setSplitInfo([Seq(p12, p13), Seq(p22)])
- // Seq(p14) | Seq() ...
- // ...
- // Seq(p1n) | Seq(p2n) =>
substraitContext.setSplitInfo([Seq(p1n), Seq(p2n)])
-
- val allSplitInfos = leafTransformers.map(_.getSplitInfosWithIndex)
+ private def getSplitInfosFromPartitions(
+ isKeyGroupPartition: Boolean,
+ leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = {
+ val allSplitInfos = if (isKeyGroupPartition) {
+ // If these are two batch scan transformer with keyGroupPartitioning,
+ // they have same partitionValues,
+ // but some partitions maybe empty for those partition values that are
not present,
+ // otherwise, exchange will be inserted. We should combine the two leaf
+ // transformers' partitions with same index, and set them together in
+ // the substraitContext. We use transpose to do that, You can refer to
+ // the diagram below.
+ // leaf1 Seq(p11) Seq(p12, p13) Seq(p14) ... Seq(p1n)
+ // leaf2 Seq(p21) Seq(p22) Seq() ... Seq(p2n)
+ // transpose =>
+ // leaf1 | leaf2
+ // Seq(p11) | Seq(p21) =>
substraitContext.setSplitInfo([Seq(p11), Seq(p21)])
+ // Seq(p12, p13) | Seq(p22) =>
substraitContext.setSplitInfo([Seq(p12, p13), Seq(p22)])
+ // Seq(p14) | Seq() ...
+ // ...
+ // Seq(p1n) | Seq(p2n) =>
substraitContext.setSplitInfo([Seq(p1n), Seq(p2n)])
+
leafTransformers.map(_.asInstanceOf[BatchScanExecTransformerBase].getSplitInfosWithIndex)
+ } else {
+ // If these are two leaf transformers, they must have same partitions,
+ // otherwise, exchange will be inserted. We should combine the two leaf
+ // transformers' partitions with same index, and set them together in
+ // the substraitContext. We use transpose to do that, You can refer to
+ // the diagram below.
+ // leaf1 p11 p12 p13 p14 ... p1n
+ // leaf2 p21 p22 p23 p24 ... p2n
+ // transpose =>
+ // leaf1 | leaf2
+ // p11 | p21 => substraitContext.setSplitInfo([p11, p21])
+ // p12 | p22 => substraitContext.setSplitInfo([p12, p22])
+ // p13 | p23 ...
+ // p14 | p24
+ // ...
+ // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n])
+ leafTransformers.map(_.getSplitInfos)
+ }
+
val partitionLength = allSplitInfos.head.size
if (allSplitInfos.exists(_.size != partitionLength)) {
throw new GlutenException(
"The partition length of all the leaf transformer are not the same.")
}
- if (GlutenConfig.get.enableHdfsViewfs) {
- val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
- allSplitInfos.foreach {
- case splitInfo: LocalFilesNode =>
- val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
- splitInfo.getPaths.asScala.toSeq,
- viewfsToHdfsCache,
- sparkContext.hadoopConfiguration)
- splitInfo.setPaths(newPaths.asJava)
- }
- }
allSplitInfos.transpose
}
- private def generateWholeStageDatasourceRDD(
- leafTransformers: Seq[BatchScanExecTransformerBase],
- wsCtx: WholeStageTransformContext,
- inputRDDs: ColumnarInputRDDsWrapper,
- pipelineTime: SQLMetric): RDD[ColumnarBatch] = {
-
- /**
- * If containing leaf exec transformer this "whole stage" generates a RDD
which itself takes
- * care of [[LeafTransformSupport]] there won't be any other RDD for leaf
operator. As a result,
- * genFirstStageIterator rather than genFinalStageIterator will be invoked
- */
- val allInputPartitions = leafTransformers.map(_.getPartitionsWithIndex)
- val allSplitInfos = getSplitInfosFromPartitionSeqs(leafTransformers)
-
- val inputPartitions =
- BackendsApiManager.getIteratorApiInstance.genPartitions(
- wsCtx,
- allSplitInfos,
- leafTransformers)
-
- val rdd = new GlutenWholeStageColumnarRDD(
- sparkContext,
- inputPartitions,
- inputRDDs,
- pipelineTime,
- leafInputMetricsUpdater(),
- BackendsApiManager.getMetricsApiInstance.metricsUpdatingFunction(
- child,
- wsCtx.substraitContext.registeredRelMap,
- wsCtx.substraitContext.registeredJoinParams,
- wsCtx.substraitContext.registeredAggregationParams
- )
- )
-
- SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
-
- rdd
- }
-
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
assert(child.isInstanceOf[TransformSupport])
val pipelineTime: SQLMetric = longMetric("pipelineTime")
@@ -515,20 +498,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val leafTransformers = findAllLeafTransformers()
if (leafTransformers.nonEmpty) {
- val isKeyGroupPartition: Boolean = leafTransformers.exists {
- // TODO: May can apply to BatchScanExecTransformer without key group
partitioning
- case b: BatchScanExecTransformerBase if
b.keyGroupedPartitioning.isDefined => true
- case _ => false
- }
- if (!isKeyGroupPartition) {
- generateWholeStageRDD(leafTransformers, wsCtx, inputRDDs, pipelineTime)
- } else {
- generateWholeStageDatasourceRDD(
- leafTransformers.map(_.asInstanceOf[BatchScanExecTransformerBase]),
- wsCtx,
- inputRDDs,
- pipelineTime)
- }
+ generateWholeStageRDD(leafTransformers, wsCtx, inputRDDs, pipelineTime)
} else {
/**
@@ -586,33 +556,6 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
override protected def withNewChildInternal(newChild: SparkPlan):
WholeStageTransformer =
copy(child = newChild, materializeInput =
materializeInput)(transformStageId)
-
- private def getSplitInfosFromPartitions(
- leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = {
- // If these are two leaf transformers, they must have same partitions,
- // otherwise, exchange will be inserted. We should combine the two leaf
- // transformers' partitions with same index, and set them together in
- // the substraitContext. We use transpose to do that, You can refer to
- // the diagram below.
- // leaf1 p11 p12 p13 p14 ... p1n
- // leaf2 p21 p22 p23 p24 ... p2n
- // transpose =>
- // leaf1 | leaf2
- // p11 | p21 => substraitContext.setSplitInfo([p11, p21])
- // p12 | p22 => substraitContext.setSplitInfo([p12, p22])
- // p13 | p23 ...
- // p14 | p24
- // ...
- // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n])
- val allSplitInfos = leafTransformers.map(_.getSplitInfos)
- val partitionLength = allSplitInfos.head.size
- if (allSplitInfos.exists(_.size != partitionLength)) {
- throw new GlutenException(
- "The partition length of all the leaf transformer are not the same.")
- }
-
- allSplitInfos.transpose
- }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]