This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 20f16b74a0 [GLUTEN-7775][CORE] Make sure the softaffinity hash 
executor list is in order (#7776)
20f16b74a0 is described below

commit 20f16b74a007f3317f1af5270c68b719b9268565
Author: Zhichao Zhang <[email protected]>
AuthorDate: Sat Nov 2 11:51:33 2024 +0800

    [GLUTEN-7775][CORE] Make sure the softaffinity hash executor list is in 
order (#7776)
    
    Before this pr, the order of the soft affinity hash executor list is 
random, will lead to cache data missing after restarting the spark 
applications. Now make sure the softaffinity hash executor list is in order, 
after restarting the spark application, the computed file preferred location 
are the same as the ones before restarting.
    
    Close #7775.
---
 .../gluten/softaffinity/SoftAffinityManager.scala  |  47 ++++-----
 .../strategy/SoftAffinityAllocationTrait.scala     |   4 +-
 .../strategy/SoftAffinityStrategy.scala            |  11 +--
 .../spark/softaffinity/SoftAffinitySuite.scala     | 108 +++++++++++----------
 4 files changed, 82 insertions(+), 88 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
index 44ff5bb9bc..c72a2680b5 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
@@ -50,7 +50,8 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
     
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
 
   // (execId, host) list
-  val fixedIdForExecutors = new mutable.ListBuffer[Option[(String, String)]]()
+  private val idForExecutors = new mutable.ListBuffer[(String, String)]()
+  var sortedIdForExecutors = new mutable.ListBuffer[(String, String)]()
   // host list
   val nodesExecutorsMap = new mutable.HashMap[String, 
mutable.HashSet[String]]()
 
@@ -96,27 +97,23 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
     try {
       // first, check whether the execId exists
       if (
-        !fixedIdForExecutors.exists(
+        !idForExecutors.exists(
           exec => {
-            exec.isDefined && exec.get._1.equals(execHostId._1)
+            exec._1.equals(execHostId._1)
           })
       ) {
         val executorsSet =
           nodesExecutorsMap.getOrElseUpdate(execHostId._2, new 
mutable.HashSet[String]())
         executorsSet.add(execHostId._1)
-        if (fixedIdForExecutors.exists(_.isEmpty)) {
-          // replace the executor which was removed
-          val replaceIdx = fixedIdForExecutors.indexWhere(_.isEmpty)
-          fixedIdForExecutors(replaceIdx) = Option(execHostId)
-        } else {
-          fixedIdForExecutors += Option(execHostId)
-        }
+        idForExecutors += execHostId
+        sortedIdForExecutors = idForExecutors.sortBy(_._2)
         totalRegisteredExecutors.addAndGet(1)
       }
       logOnLevel(
         GlutenConfig.getConf.softAffinityLogLevel,
         s"After adding executor ${execHostId._1} on host ${execHostId._2}, " +
-          s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
+          s"idForExecutors is ${idForExecutors.mkString(",")}, " +
+          s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
           s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
           s"actual executors count is ${totalRegisteredExecutors.intValue()}."
       )
@@ -128,29 +125,27 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
   def handleExecutorRemoved(execId: String): Unit = {
     resourceRWLock.writeLock().lock()
     try {
-      val execIdx = fixedIdForExecutors.indexWhere(
+      val execIdx = idForExecutors.indexWhere(
         execHost => {
-          if (execHost.isDefined) {
-            execHost.get._1.equals(execId)
-          } else {
-            false
-          }
+          execHost._1.equals(execId)
         })
       if (execIdx != -1) {
-        val findedExecId = fixedIdForExecutors(execIdx)
-        fixedIdForExecutors(execIdx) = None
-        val nodeExecs = nodesExecutorsMap(findedExecId.get._2)
-        nodeExecs -= findedExecId.get._1
+        val findedExecId = idForExecutors(execIdx)
+        idForExecutors.remove(execIdx)
+        val nodeExecs = nodesExecutorsMap(findedExecId._2)
+        nodeExecs -= findedExecId._1
         if (nodeExecs.isEmpty) {
           // there is no executor on this host, remove
-          nodesExecutorsMap.remove(findedExecId.get._2)
+          nodesExecutorsMap.remove(findedExecId._2)
         }
+        sortedIdForExecutors = idForExecutors.sortBy(_._2)
         totalRegisteredExecutors.addAndGet(-1)
       }
       logOnLevel(
         GlutenConfig.getConf.softAffinityLogLevel,
         s"After removing executor $execId, " +
-          s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
+          s"idForExecutors is ${idForExecutors.mkString(",")}, " +
+          s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
           s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
           s"actual executors count is ${totalRegisteredExecutors.intValue()}."
       )
@@ -242,7 +237,7 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
       if (nodesExecutorsMap.size < 1) {
         Array.empty
       } else {
-        softAffinityAllocation.allocateExecs(file, fixedIdForExecutors)
+        softAffinityAllocation.allocateExecs(file, sortedIdForExecutors)
       }
     } finally {
       resourceRWLock.readLock().unlock()
@@ -252,11 +247,11 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
   def askExecutors(f: FilePartition): Array[(String, String)] = {
     resourceRWLock.readLock().lock()
     try {
-      if (fixedIdForExecutors.size < 1) {
+      if (sortedIdForExecutors.size < 1) {
         Array.empty
       } else {
         val result = getDuplicateReadingLocation(f)
-        result.filter(r => fixedIdForExecutors.exists(s => s.isDefined && 
s.get._1 == r._1)).toArray
+        result.filter(r => sortedIdForExecutors.exists(s => s._1 == 
r._1)).toArray
       }
     } finally {
       resourceRWLock.readLock().unlock()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
index 8191e37842..639efd22e9 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
@@ -30,7 +30,5 @@ trait SoftAffinityAllocationTrait {
   )
 
   /** allocate target executors for file */
-  def allocateExecs(
-      file: String,
-      candidates: ListBuffer[Option[(String, String)]]): Array[(String, 
String)]
+  def allocateExecs(file: String, candidates: ListBuffer[(String, String)]): 
Array[(String, String)]
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
index bc36c3b1ea..7af5f212c1 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
@@ -26,7 +26,7 @@ class SoftAffinityStrategy extends 
SoftAffinityAllocationTrait with Logging {
   /** allocate target executors for file */
   override def allocateExecs(
       file: String,
-      candidates: ListBuffer[Option[(String, String)]]): Array[(String, 
String)] = {
+      candidates: ListBuffer[(String, String)]): Array[(String, String)] = {
     if (candidates.size < 1) {
       Array.empty
     } else {
@@ -37,15 +37,10 @@ class SoftAffinityStrategy extends 
SoftAffinityAllocationTrait with Logging {
       // TODO: try to use ConsistentHash
       val mod = file.hashCode % candidatesSize
       val c1 = if (mod < 0) (mod + candidatesSize) else mod
-      // check whether the executor with index c1 is down
-      if (candidates(c1).isDefined) {
-        resultSet.add(candidates(c1).get)
-      }
+      resultSet.add(candidates(c1))
       for (i <- 1 until softAffinityReplicationNum) {
         val c2 = (c1 + halfCandidatesSize + i) % candidatesSize
-        if (candidates(c2).isDefined) {
-          resultSet.add(candidates(c2).get)
-        }
+        resultSet.add(candidates(c2))
       }
       resultSet.toArray
     }
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
index 9337317d96..4145fd5bc6 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
@@ -31,6 +31,8 @@ import 
org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.test.SharedSparkSession
 
+import scala.collection.mutable.ListBuffer
+
 class SoftAffinitySuite extends QueryTest with SharedSparkSession with 
PredicateHelper {
 
   override protected def sparkConf: SparkConf = super.sparkConf
@@ -80,14 +82,14 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
           "fakePath0",
           0,
           100,
-          Array("host-1", "host-2")
+          Array("192.168.22.1", "host-2")
         ),
         SparkShimLoader.getSparkShims.generatePartitionedFile(
           InternalRow.empty,
           "fakePath1",
           0,
           200,
-          Array("host-4", "host-5")
+          Array("192.168.22.1", "host-5")
         )
       ).toArray
     )
@@ -98,13 +100,7 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
 
     val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
 
-    val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
-      Set("host-1", "host-4", "host-5")
-    } else if (scalaVersion.startsWith("2.13")) {
-      Set("host-5", "host-4", "host-2")
-    }
-
-    assertResult(affinityResultSet) {
+    assertResult(Set("192.168.22.1", "host-5", "host-2")) {
       nativePartition.preferredLocations().toSet
     }
   }
@@ -136,30 +132,7 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
 
     val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
 
-    assertResult(Set("executor_host-2_2", "executor_host-1_0")) {
-      nativePartition.preferredLocations().toSet
-    }
-  }
-
-  def generateNativePartition4(): Unit = {
-    val partition = FilePartition(
-      0,
-      Seq(
-        SparkShimLoader.getSparkShims.generatePartitionedFile(
-          InternalRow.empty,
-          "fakePath_0",
-          0,
-          100)
-      ).toArray
-    )
-
-    val locations = SoftAffinity.getFilePartitionLocations(
-      partition.files.map(_.filePath.toString),
-      partition.preferredLocations())
-
-    val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
-
-    assertResult(Set("executor_host-1_1")) {
+    assertResult(Set("executor_192.168.22.1_1", "executor_10.1.1.33_6")) {
       nativePartition.preferredLocations().toSet
     }
   }
@@ -206,11 +179,11 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
     val addEvent0 = SparkListenerExecutorAdded(
       System.currentTimeMillis(),
       "0",
-      new ExecutorInfo("host-1", 3, null))
+      new ExecutorInfo("192.168.22.1", 3, null))
     val addEvent1 = SparkListenerExecutorAdded(
       System.currentTimeMillis(),
       "1",
-      new ExecutorInfo("host-1", 3, null))
+      new ExecutorInfo("192.168.22.1", 3, null))
     val addEvent2 = SparkListenerExecutorAdded(
       System.currentTimeMillis(),
       "2",
@@ -234,7 +207,7 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
     val addEvent6 = SparkListenerExecutorAdded(
       System.currentTimeMillis(),
       "6",
-      new ExecutorInfo("host-4", 3, null))
+      new ExecutorInfo("10.1.1.33", 3, null))
 
     val removedEvent0 = 
SparkListenerExecutorRemoved(System.currentTimeMillis(), "0", "")
     val removedEvent1 = 
SparkListenerExecutorRemoved(System.currentTimeMillis(), "1", "")
@@ -256,23 +229,35 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
     executorsListListener.onExecutorAdded(addEvent3_1)
 
     assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
-    assert(SoftAffinityManager.fixedIdForExecutors.size == 4)
+    assert(SoftAffinityManager.sortedIdForExecutors.size == 4)
 
     executorsListListener.onExecutorRemoved(removedEvent3)
     // test removing executor repeatedly
     executorsListListener.onExecutorRemoved(removedEvent3_1)
 
     assert(SoftAffinityManager.nodesExecutorsMap.size == 2)
-    assert(SoftAffinityManager.fixedIdForExecutors.size == 4)
-    assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+    assert(SoftAffinityManager.sortedIdForExecutors.size == 3)
+    assert(
+      SoftAffinityManager.sortedIdForExecutors.equals(
+        ListBuffer[(String, String)](("0", "192.168.22.1"), ("1", 
"192.168.22.1"), ("2", "host-2"))
+      ))
 
     executorsListListener.onExecutorAdded(addEvent4)
     executorsListListener.onExecutorAdded(addEvent5)
     executorsListListener.onExecutorAdded(addEvent6)
 
     assert(SoftAffinityManager.nodesExecutorsMap.size == 4)
-    assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
-    assert(!SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+    assert(SoftAffinityManager.sortedIdForExecutors.size == 6)
+    assert(
+      SoftAffinityManager.sortedIdForExecutors.equals(
+        ListBuffer[(String, String)](
+          ("6", "10.1.1.33"),
+          ("0", "192.168.22.1"),
+          ("1", "192.168.22.1"),
+          ("2", "host-2"),
+          ("5", "host-2"),
+          ("4", "host-3"))
+      ))
 
     // all target hosts exist in computing hosts list, return the original 
hosts list
     generateNativePartition1()
@@ -286,19 +271,21 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
     executorsListListener.onExecutorRemoved(removedEvent4)
 
     assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
-    assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
-    assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+    assert(SoftAffinityManager.sortedIdForExecutors.size == 4)
+    assert(
+      SoftAffinityManager.sortedIdForExecutors.equals(
+        ListBuffer[(String, String)](
+          ("6", "10.1.1.33"),
+          ("0", "192.168.22.1"),
+          ("1", "192.168.22.1"),
+          ("5", "host-2"))
+      ))
 
     executorsListListener.onExecutorRemoved(removedEvent2)
     executorsListListener.onExecutorRemoved(removedEvent4)
 
     assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
-    assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
-    assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
-
-    // there are only one target host existing in computing hosts list,
-    // but the hash executors were removed, so return the original hosts list
-    generateNativePartition4()
+    assert(SoftAffinityManager.sortedIdForExecutors.size == 4)
 
     executorsListListener.onExecutorRemoved(removedEvent0)
     executorsListListener.onExecutorRemoved(removedEvent1)
@@ -307,10 +294,29 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
     executorsListListener.onExecutorRemoved(removedEvent7)
 
     assert(SoftAffinityManager.nodesExecutorsMap.isEmpty)
-    assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
-    assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+    assert(SoftAffinityManager.sortedIdForExecutors.isEmpty)
 
     // all executors were removed, return the original hosts list
     generateNativePartition5()
+
+    executorsListListener.onExecutorAdded(addEvent0)
+    executorsListListener.onExecutorAdded(addEvent1)
+    executorsListListener.onExecutorAdded(addEvent2)
+    executorsListListener.onExecutorAdded(addEvent3)
+    executorsListListener.onExecutorAdded(addEvent4)
+    executorsListListener.onExecutorAdded(addEvent5)
+    executorsListListener.onExecutorAdded(addEvent6)
+    assert(SoftAffinityManager.sortedIdForExecutors.size == 7)
+    assert(
+      SoftAffinityManager.sortedIdForExecutors.equals(
+        ListBuffer[(String, String)](
+          ("6", "10.1.1.33"),
+          ("0", "192.168.22.1"),
+          ("1", "192.168.22.1"),
+          ("2", "host-2"),
+          ("5", "host-2"),
+          ("3", "host-3"),
+          ("4", "host-3"))
+      ))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to