This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new adeff656b0 [GLUTEN-9078][CORE] Simplify code of SoftAffinity (#9079)
adeff656b0 is described below

commit adeff656b0c826b697add4335ed7c79ee0b23b62
Author: WangGuangxin <[email protected]>
AuthorDate: Sun Mar 23 02:18:13 2025 -0700

    [GLUTEN-9078][CORE] Simplify code of SoftAffinity (#9079)
    
    * Simplify code of SoftAffinity
    
    * remove toIndexedSeq
---
 .../apache/spark/softaffinity/SoftAffinity.scala    | 11 +++++++++--
 .../gluten/execution/WholeStageTransformer.scala    | 21 +++------------------
 2 files changed, 12 insertions(+), 20 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala 
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
index f0f16bae76..7ae47e422b 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.softaffinity.{AffinityManager, 
SoftAffinityManager}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.FilePartition
 
 abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil 
with Logging {
@@ -79,9 +80,15 @@ abstract class Affinity(val manager: AffinityManager) 
extends LogLevelUtil with
   }
 
   /** Update the RDD id to SoftAffinityManager */
-  def updateFilePartitionLocations(filePartition: FilePartition, rddId: Int): 
Unit = {
+  def updateFilePartitionLocations(
+      inputPartitions: Seq[Seq[Seq[InputPartition]]],
+      rddId: Int): Unit = {
     if (SoftAffinityManager.usingSoftAffinity && 
SoftAffinityManager.detectDuplicateReading) {
-      SoftAffinityManager.updatePartitionMap(filePartition, rddId)
+      inputPartitions.foreach(_.foreach(_.foreach {
+        case f: FilePartition =>
+          SoftAffinityManager.updatePartitionMap(f, rddId)
+        case _ =>
+      }))
     }
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 54f4cebf6a..b1b0f3ddfc 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -389,7 +388,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
      * care of [[LeafTransformSupport]] there won't be any other RDD for leaf 
operator. As a result,
      * genFirstStageIterator rather than genFinalStageIterator will be invoked
      */
-    val allInputPartitions = leafTransformers.map(_.getPartitions.toIndexedSeq)
+    val allInputPartitions = leafTransformers.map(_.getPartitions)
     val allSplitInfos = getSplitInfosFromPartitions(leafTransformers)
 
     if (GlutenConfig.get.enableHdfsViewfs) {
@@ -427,17 +426,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       )
     )
 
-    allInputPartitions.head.indices.foreach(
-      i => {
-        val currentPartitions = allInputPartitions.map(_(i))
-        currentPartitions.indices.foreach(
-          i =>
-            currentPartitions(i) match {
-              case f: FilePartition =>
-                SoftAffinity.updateFilePartitionLocations(f, rdd.id)
-              case _ =>
-            })
-      })
+    SoftAffinity.updateFilePartitionLocations(Seq(allInputPartitions), rdd.id)
 
     rdd
   }
@@ -516,11 +505,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       )
     )
 
-    allInputPartitions.foreach(_.foreach(_.foreach {
-      case f: FilePartition =>
-        SoftAffinity.updateFilePartitionLocations(f, rdd.id)
-      case _ =>
-    }))
+    SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
 
     rdd
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to