This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7951720063 [VL] Refactor WholeStageTransformer to remove some 
duplicate code (#9388)
7951720063 is described below

commit 7951720063eb794464f83d0d19899b9befeda69d
Author: wypb <[email protected]>
AuthorDate: Thu Apr 24 21:50:32 2025 +0800

    [VL] Refactor WholeStageTransformer to remove some duplicate code (#9388)
---
 .../gluten/execution/WholeStageTransformer.scala   | 171 +++++++--------------
 1 file changed, 57 insertions(+), 114 deletions(-)

diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 745ad5966c..bdf425f78d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -372,14 +372,27 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       wsCtx: WholeStageTransformContext,
       inputRDDs: ColumnarInputRDDsWrapper,
       pipelineTime: SQLMetric): RDD[ColumnarBatch] = {
+    val isKeyGroupPartition = leafTransformers.exists {
+      // TODO: May can apply to BatchScanExecTransformer without key group 
partitioning
+      case b: BatchScanExecTransformerBase if 
b.keyGroupedPartitioning.isDefined => true
+      case _ => false
+    }
 
     /**
      * If containing leaf exec transformer this "whole stage" generates a RDD 
which itself takes
      * care of [[LeafTransformSupport]] there won't be any other RDD for leaf 
operator. As a result,
      * genFirstStageIterator rather than genFinalStageIterator will be invoked
      */
-    val allInputPartitions = leafTransformers.map(_.getPartitions)
-    val allSplitInfos = getSplitInfosFromPartitions(leafTransformers)
+    val allInputPartitions = leafTransformers.map(
+      leafTransformer => {
+        if (isKeyGroupPartition) {
+          
leafTransformer.asInstanceOf[BatchScanExecTransformerBase].getPartitionsWithIndex
+        } else {
+          Seq(leafTransformer.getPartitions)
+        }
+      })
+
+    val allSplitInfos = getSplitInfosFromPartitions(isKeyGroupPartition, 
leafTransformers)
 
     if (GlutenConfig.get.enableHdfsViewfs) {
       val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
@@ -416,90 +429,60 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       )
     )
 
-    SoftAffinity.updateFilePartitionLocations(Seq(allInputPartitions), rdd.id)
+    SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
 
     rdd
   }
 
-  private def getSplitInfosFromPartitionSeqs(
-      leafTransformers: Seq[BatchScanExecTransformerBase]): 
Seq[Seq[SplitInfo]] = {
-    // If these are two batch scan transformer with keyGroupPartitioning,
-    // they have same partitionValues,
-    // but some partitions maybe empty for those partition values that are not 
present,
-    // otherwise, exchange will be inserted. We should combine the two leaf
-    // transformers' partitions with same index, and set them together in
-    // the substraitContext. We use transpose to do that, You can refer to
-    // the diagram below.
-    // leaf1  Seq(p11) Seq(p12, p13) Seq(p14) ... Seq(p1n)
-    // leaf2  Seq(p21) Seq(p22)      Seq()    ... Seq(p2n)
-    // transpose =>
-    // leaf1 | leaf2
-    //  Seq(p11)       |  Seq(p21)    => 
substraitContext.setSplitInfo([Seq(p11), Seq(p21)])
-    //  Seq(p12, p13)  |  Seq(p22)    => 
substraitContext.setSplitInfo([Seq(p12, p13), Seq(p22)])
-    //  Seq(p14)       |  Seq()    ...
-    //                ...
-    //  Seq(p1n)       |  Seq(p2n)    => 
substraitContext.setSplitInfo([Seq(p1n), Seq(p2n)])
-
-    val allSplitInfos = leafTransformers.map(_.getSplitInfosWithIndex)
+  private def getSplitInfosFromPartitions(
+      isKeyGroupPartition: Boolean,
+      leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = {
+    val allSplitInfos = if (isKeyGroupPartition) {
+      // If these are two batch scan transformer with keyGroupPartitioning,
+      // they have same partitionValues,
+      // but some partitions maybe empty for those partition values that are 
not present,
+      // otherwise, exchange will be inserted. We should combine the two leaf
+      // transformers' partitions with same index, and set them together in
+      // the substraitContext. We use transpose to do that, You can refer to
+      // the diagram below.
+      // leaf1  Seq(p11) Seq(p12, p13) Seq(p14) ... Seq(p1n)
+      // leaf2  Seq(p21) Seq(p22)      Seq()    ... Seq(p2n)
+      // transpose =>
+      // leaf1 | leaf2
+      //  Seq(p11)       |  Seq(p21)    => 
substraitContext.setSplitInfo([Seq(p11), Seq(p21)])
+      //  Seq(p12, p13)  |  Seq(p22)    => 
substraitContext.setSplitInfo([Seq(p12, p13), Seq(p22)])
+      //  Seq(p14)       |  Seq()    ...
+      //                ...
+      //  Seq(p1n)       |  Seq(p2n)    => 
substraitContext.setSplitInfo([Seq(p1n), Seq(p2n)])
+      
leafTransformers.map(_.asInstanceOf[BatchScanExecTransformerBase].getSplitInfosWithIndex)
+    } else {
+      // If these are two leaf transformers, they must have same partitions,
+      // otherwise, exchange will be inserted. We should combine the two leaf
+      // transformers' partitions with same index, and set them together in
+      // the substraitContext. We use transpose to do that, You can refer to
+      // the diagram below.
+      // leaf1  p11 p12 p13 p14 ... p1n
+      // leaf2  p21 p22 p23 p24 ... p2n
+      // transpose =>
+      // leaf1 | leaf2
+      //  p11  |  p21    => substraitContext.setSplitInfo([p11, p21])
+      //  p12  |  p22    => substraitContext.setSplitInfo([p12, p22])
+      //  p13  |  p23    ...
+      //  p14  |  p24
+      //      ...
+      //  p1n  |  p2n    => substraitContext.setSplitInfo([p1n, p2n])
+      leafTransformers.map(_.getSplitInfos)
+    }
+
     val partitionLength = allSplitInfos.head.size
     if (allSplitInfos.exists(_.size != partitionLength)) {
       throw new GlutenException(
         "The partition length of all the leaf transformer are not the same.")
     }
-    if (GlutenConfig.get.enableHdfsViewfs) {
-      val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
-      allSplitInfos.foreach {
-        case splitInfo: LocalFilesNode =>
-          val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
-            splitInfo.getPaths.asScala.toSeq,
-            viewfsToHdfsCache,
-            sparkContext.hadoopConfiguration)
-          splitInfo.setPaths(newPaths.asJava)
-      }
-    }
 
     allSplitInfos.transpose
   }
 
-  private def generateWholeStageDatasourceRDD(
-      leafTransformers: Seq[BatchScanExecTransformerBase],
-      wsCtx: WholeStageTransformContext,
-      inputRDDs: ColumnarInputRDDsWrapper,
-      pipelineTime: SQLMetric): RDD[ColumnarBatch] = {
-
-    /**
-     * If containing leaf exec transformer this "whole stage" generates a RDD 
which itself takes
-     * care of [[LeafTransformSupport]] there won't be any other RDD for leaf 
operator. As a result,
-     * genFirstStageIterator rather than genFinalStageIterator will be invoked
-     */
-    val allInputPartitions = leafTransformers.map(_.getPartitionsWithIndex)
-    val allSplitInfos = getSplitInfosFromPartitionSeqs(leafTransformers)
-
-    val inputPartitions =
-      BackendsApiManager.getIteratorApiInstance.genPartitions(
-        wsCtx,
-        allSplitInfos,
-        leafTransformers)
-
-    val rdd = new GlutenWholeStageColumnarRDD(
-      sparkContext,
-      inputPartitions,
-      inputRDDs,
-      pipelineTime,
-      leafInputMetricsUpdater(),
-      BackendsApiManager.getMetricsApiInstance.metricsUpdatingFunction(
-        child,
-        wsCtx.substraitContext.registeredRelMap,
-        wsCtx.substraitContext.registeredJoinParams,
-        wsCtx.substraitContext.registeredAggregationParams
-      )
-    )
-
-    SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
-
-    rdd
-  }
-
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     assert(child.isInstanceOf[TransformSupport])
     val pipelineTime: SQLMetric = longMetric("pipelineTime")
@@ -515,20 +498,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
     val leafTransformers = findAllLeafTransformers()
     if (leafTransformers.nonEmpty) {
-      val isKeyGroupPartition: Boolean = leafTransformers.exists {
-        // TODO: May can apply to BatchScanExecTransformer without key group 
partitioning
-        case b: BatchScanExecTransformerBase if 
b.keyGroupedPartitioning.isDefined => true
-        case _ => false
-      }
-      if (!isKeyGroupPartition) {
-        generateWholeStageRDD(leafTransformers, wsCtx, inputRDDs, pipelineTime)
-      } else {
-        generateWholeStageDatasourceRDD(
-          leafTransformers.map(_.asInstanceOf[BatchScanExecTransformerBase]),
-          wsCtx,
-          inputRDDs,
-          pipelineTime)
-      }
+      generateWholeStageRDD(leafTransformers, wsCtx, inputRDDs, pipelineTime)
     } else {
 
       /**
@@ -586,33 +556,6 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
WholeStageTransformer =
     copy(child = newChild, materializeInput = 
materializeInput)(transformStageId)
-
-  private def getSplitInfosFromPartitions(
-      leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = {
-    // If these are two leaf transformers, they must have same partitions,
-    // otherwise, exchange will be inserted. We should combine the two leaf
-    // transformers' partitions with same index, and set them together in
-    // the substraitContext. We use transpose to do that, You can refer to
-    // the diagram below.
-    // leaf1  p11 p12 p13 p14 ... p1n
-    // leaf2  p21 p22 p23 p24 ... p2n
-    // transpose =>
-    // leaf1 | leaf2
-    //  p11  |  p21    => substraitContext.setSplitInfo([p11, p21])
-    //  p12  |  p22    => substraitContext.setSplitInfo([p12, p22])
-    //  p13  |  p23    ...
-    //  p14  |  p24
-    //      ...
-    //  p1n  |  p2n    => substraitContext.setSplitInfo([p1n, p2n])
-    val allSplitInfos = leafTransformers.map(_.getSplitInfos)
-    val partitionLength = allSplitInfos.head.size
-    if (allSplitInfos.exists(_.size != partitionLength)) {
-      throw new GlutenException(
-        "The partition length of all the leaf transformer are not the same.")
-    }
-
-    allSplitInfos.transpose
-  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to