This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new adeff656b0 [GLUTEN-9078][CORE] Simplify code of SoftAffinity (#9079)
adeff656b0 is described below
commit adeff656b0c826b697add4335ed7c79ee0b23b62
Author: WangGuangxin <[email protected]>
AuthorDate: Sun Mar 23 02:18:13 2025 -0700
[GLUTEN-9078][CORE] Simplify code of SoftAffinity (#9079)
* Simplify code of SoftAffinity
* remove toIndexedSeq
---
.../apache/spark/softaffinity/SoftAffinity.scala | 11 +++++++++--
.../gluten/execution/WholeStageTransformer.scala | 21 +++------------------
2 files changed, 12 insertions(+), 20 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
index f0f16bae76..7ae47e422b 100644
---
a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.softaffinity.{AffinityManager,
SoftAffinityManager}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil
with Logging {
@@ -79,9 +80,15 @@ abstract class Affinity(val manager: AffinityManager)
extends LogLevelUtil with
}
/** Update the RDD id to SoftAffinityManager */
- def updateFilePartitionLocations(filePartition: FilePartition, rddId: Int):
Unit = {
+ def updateFilePartitionLocations(
+ inputPartitions: Seq[Seq[Seq[InputPartition]]],
+ rddId: Int): Unit = {
if (SoftAffinityManager.usingSoftAffinity &&
SoftAffinityManager.detectDuplicateReading) {
- SoftAffinityManager.updatePartitionMap(filePartition, rddId)
+ inputPartitions.foreach(_.foreach(_.foreach {
+ case f: FilePartition =>
+ SoftAffinityManager.updatePartitionMap(f, rddId)
+ case _ =>
+ }))
}
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 54f4cebf6a..b1b0f3ddfc 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -389,7 +388,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
* care of [[LeafTransformSupport]] there won't be any other RDD for leaf
operator. As a result,
* genFirstStageIterator rather than genFinalStageIterator will be invoked
*/
- val allInputPartitions = leafTransformers.map(_.getPartitions.toIndexedSeq)
+ val allInputPartitions = leafTransformers.map(_.getPartitions)
val allSplitInfos = getSplitInfosFromPartitions(leafTransformers)
if (GlutenConfig.get.enableHdfsViewfs) {
@@ -427,17 +426,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
)
)
- allInputPartitions.head.indices.foreach(
- i => {
- val currentPartitions = allInputPartitions.map(_(i))
- currentPartitions.indices.foreach(
- i =>
- currentPartitions(i) match {
- case f: FilePartition =>
- SoftAffinity.updateFilePartitionLocations(f, rdd.id)
- case _ =>
- })
- })
+ SoftAffinity.updateFilePartitionLocations(Seq(allInputPartitions), rdd.id)
rdd
}
@@ -516,11 +505,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
)
)
- allInputPartitions.foreach(_.foreach(_.foreach {
- case f: FilePartition =>
- SoftAffinity.updateFilePartitionLocations(f, rdd.id)
- case _ =>
- }))
+ SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
rdd
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]