This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 20f16b74a0 [GLUTEN-7775][CORE] Make sure the softaffinity hash
executor list is in order (#7776)
20f16b74a0 is described below
commit 20f16b74a007f3317f1af5270c68b719b9268565
Author: Zhichao Zhang <[email protected]>
AuthorDate: Sat Nov 2 11:51:33 2024 +0800
[GLUTEN-7775][CORE] Make sure the softaffinity hash executor list is in
order (#7776)
Before this pr, the order of the soft affinity hash executor list is
random, will lead to cache data missing after restarting the spark
applications. Now make sure the softaffinity hash executor list is in order,
after restarting the spark application, the computed file preferred location
are the same as the ones before restarting.
Close #7775.
---
.../gluten/softaffinity/SoftAffinityManager.scala | 47 ++++-----
.../strategy/SoftAffinityAllocationTrait.scala | 4 +-
.../strategy/SoftAffinityStrategy.scala | 11 +--
.../spark/softaffinity/SoftAffinitySuite.scala | 108 +++++++++++----------
4 files changed, 82 insertions(+), 88 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
index 44ff5bb9bc..c72a2680b5 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
@@ -50,7 +50,8 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
// (execId, host) list
- val fixedIdForExecutors = new mutable.ListBuffer[Option[(String, String)]]()
+ private val idForExecutors = new mutable.ListBuffer[(String, String)]()
+ var sortedIdForExecutors = new mutable.ListBuffer[(String, String)]()
// host list
val nodesExecutorsMap = new mutable.HashMap[String,
mutable.HashSet[String]]()
@@ -96,27 +97,23 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
try {
// first, check whether the execId exists
if (
- !fixedIdForExecutors.exists(
+ !idForExecutors.exists(
exec => {
- exec.isDefined && exec.get._1.equals(execHostId._1)
+ exec._1.equals(execHostId._1)
})
) {
val executorsSet =
nodesExecutorsMap.getOrElseUpdate(execHostId._2, new
mutable.HashSet[String]())
executorsSet.add(execHostId._1)
- if (fixedIdForExecutors.exists(_.isEmpty)) {
- // replace the executor which was removed
- val replaceIdx = fixedIdForExecutors.indexWhere(_.isEmpty)
- fixedIdForExecutors(replaceIdx) = Option(execHostId)
- } else {
- fixedIdForExecutors += Option(execHostId)
- }
+ idForExecutors += execHostId
+ sortedIdForExecutors = idForExecutors.sortBy(_._2)
totalRegisteredExecutors.addAndGet(1)
}
logOnLevel(
GlutenConfig.getConf.softAffinityLogLevel,
s"After adding executor ${execHostId._1} on host ${execHostId._2}, " +
- s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
+ s"idForExecutors is ${idForExecutors.mkString(",")}, " +
+ s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
s"actual executors count is ${totalRegisteredExecutors.intValue()}."
)
@@ -128,29 +125,27 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
def handleExecutorRemoved(execId: String): Unit = {
resourceRWLock.writeLock().lock()
try {
- val execIdx = fixedIdForExecutors.indexWhere(
+ val execIdx = idForExecutors.indexWhere(
execHost => {
- if (execHost.isDefined) {
- execHost.get._1.equals(execId)
- } else {
- false
- }
+ execHost._1.equals(execId)
})
if (execIdx != -1) {
- val findedExecId = fixedIdForExecutors(execIdx)
- fixedIdForExecutors(execIdx) = None
- val nodeExecs = nodesExecutorsMap(findedExecId.get._2)
- nodeExecs -= findedExecId.get._1
+ val findedExecId = idForExecutors(execIdx)
+ idForExecutors.remove(execIdx)
+ val nodeExecs = nodesExecutorsMap(findedExecId._2)
+ nodeExecs -= findedExecId._1
if (nodeExecs.isEmpty) {
// there is no executor on this host, remove
- nodesExecutorsMap.remove(findedExecId.get._2)
+ nodesExecutorsMap.remove(findedExecId._2)
}
+ sortedIdForExecutors = idForExecutors.sortBy(_._2)
totalRegisteredExecutors.addAndGet(-1)
}
logOnLevel(
GlutenConfig.getConf.softAffinityLogLevel,
s"After removing executor $execId, " +
- s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
+ s"idForExecutors is ${idForExecutors.mkString(",")}, " +
+ s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " +
s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
s"actual executors count is ${totalRegisteredExecutors.intValue()}."
)
@@ -242,7 +237,7 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
if (nodesExecutorsMap.size < 1) {
Array.empty
} else {
- softAffinityAllocation.allocateExecs(file, fixedIdForExecutors)
+ softAffinityAllocation.allocateExecs(file, sortedIdForExecutors)
}
} finally {
resourceRWLock.readLock().unlock()
@@ -252,11 +247,11 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
def askExecutors(f: FilePartition): Array[(String, String)] = {
resourceRWLock.readLock().lock()
try {
- if (fixedIdForExecutors.size < 1) {
+ if (sortedIdForExecutors.size < 1) {
Array.empty
} else {
val result = getDuplicateReadingLocation(f)
- result.filter(r => fixedIdForExecutors.exists(s => s.isDefined &&
s.get._1 == r._1)).toArray
+ result.filter(r => sortedIdForExecutors.exists(s => s._1 ==
r._1)).toArray
}
} finally {
resourceRWLock.readLock().unlock()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
index 8191e37842..639efd22e9 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
@@ -30,7 +30,5 @@ trait SoftAffinityAllocationTrait {
)
/** allocate target executors for file */
- def allocateExecs(
- file: String,
- candidates: ListBuffer[Option[(String, String)]]): Array[(String,
String)]
+ def allocateExecs(file: String, candidates: ListBuffer[(String, String)]):
Array[(String, String)]
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
index bc36c3b1ea..7af5f212c1 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
@@ -26,7 +26,7 @@ class SoftAffinityStrategy extends
SoftAffinityAllocationTrait with Logging {
/** allocate target executors for file */
override def allocateExecs(
file: String,
- candidates: ListBuffer[Option[(String, String)]]): Array[(String,
String)] = {
+ candidates: ListBuffer[(String, String)]): Array[(String, String)] = {
if (candidates.size < 1) {
Array.empty
} else {
@@ -37,15 +37,10 @@ class SoftAffinityStrategy extends
SoftAffinityAllocationTrait with Logging {
// TODO: try to use ConsistentHash
val mod = file.hashCode % candidatesSize
val c1 = if (mod < 0) (mod + candidatesSize) else mod
- // check whether the executor with index c1 is down
- if (candidates(c1).isDefined) {
- resultSet.add(candidates(c1).get)
- }
+ resultSet.add(candidates(c1))
for (i <- 1 until softAffinityReplicationNum) {
val c2 = (c1 + halfCandidatesSize + i) % candidatesSize
- if (candidates(c2).isDefined) {
- resultSet.add(candidates(c2).get)
- }
+ resultSet.add(candidates(c2))
}
resultSet.toArray
}
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
index 9337317d96..4145fd5bc6 100644
---
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
@@ -31,6 +31,8 @@ import
org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.test.SharedSparkSession
+import scala.collection.mutable.ListBuffer
+
class SoftAffinitySuite extends QueryTest with SharedSparkSession with
PredicateHelper {
override protected def sparkConf: SparkConf = super.sparkConf
@@ -80,14 +82,14 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
"fakePath0",
0,
100,
- Array("host-1", "host-2")
+ Array("192.168.22.1", "host-2")
),
SparkShimLoader.getSparkShims.generatePartitionedFile(
InternalRow.empty,
"fakePath1",
0,
200,
- Array("host-4", "host-5")
+ Array("192.168.22.1", "host-5")
)
).toArray
)
@@ -98,13 +100,7 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
- val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
- Set("host-1", "host-4", "host-5")
- } else if (scalaVersion.startsWith("2.13")) {
- Set("host-5", "host-4", "host-2")
- }
-
- assertResult(affinityResultSet) {
+ assertResult(Set("192.168.22.1", "host-5", "host-2")) {
nativePartition.preferredLocations().toSet
}
}
@@ -136,30 +132,7 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
- assertResult(Set("executor_host-2_2", "executor_host-1_0")) {
- nativePartition.preferredLocations().toSet
- }
- }
-
- def generateNativePartition4(): Unit = {
- val partition = FilePartition(
- 0,
- Seq(
- SparkShimLoader.getSparkShims.generatePartitionedFile(
- InternalRow.empty,
- "fakePath_0",
- 0,
- 100)
- ).toArray
- )
-
- val locations = SoftAffinity.getFilePartitionLocations(
- partition.files.map(_.filePath.toString),
- partition.preferredLocations())
-
- val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
-
- assertResult(Set("executor_host-1_1")) {
+ assertResult(Set("executor_192.168.22.1_1", "executor_10.1.1.33_6")) {
nativePartition.preferredLocations().toSet
}
}
@@ -206,11 +179,11 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
val addEvent0 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"0",
- new ExecutorInfo("host-1", 3, null))
+ new ExecutorInfo("192.168.22.1", 3, null))
val addEvent1 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"1",
- new ExecutorInfo("host-1", 3, null))
+ new ExecutorInfo("192.168.22.1", 3, null))
val addEvent2 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"2",
@@ -234,7 +207,7 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
val addEvent6 = SparkListenerExecutorAdded(
System.currentTimeMillis(),
"6",
- new ExecutorInfo("host-4", 3, null))
+ new ExecutorInfo("10.1.1.33", 3, null))
val removedEvent0 =
SparkListenerExecutorRemoved(System.currentTimeMillis(), "0", "")
val removedEvent1 =
SparkListenerExecutorRemoved(System.currentTimeMillis(), "1", "")
@@ -256,23 +229,35 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
executorsListListener.onExecutorAdded(addEvent3_1)
assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
- assert(SoftAffinityManager.fixedIdForExecutors.size == 4)
+ assert(SoftAffinityManager.sortedIdForExecutors.size == 4)
executorsListListener.onExecutorRemoved(removedEvent3)
// test removing executor repeatedly
executorsListListener.onExecutorRemoved(removedEvent3_1)
assert(SoftAffinityManager.nodesExecutorsMap.size == 2)
- assert(SoftAffinityManager.fixedIdForExecutors.size == 4)
- assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+ assert(SoftAffinityManager.sortedIdForExecutors.size == 3)
+ assert(
+ SoftAffinityManager.sortedIdForExecutors.equals(
+ ListBuffer[(String, String)](("0", "192.168.22.1"), ("1",
"192.168.22.1"), ("2", "host-2"))
+ ))
executorsListListener.onExecutorAdded(addEvent4)
executorsListListener.onExecutorAdded(addEvent5)
executorsListListener.onExecutorAdded(addEvent6)
assert(SoftAffinityManager.nodesExecutorsMap.size == 4)
- assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
- assert(!SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+ assert(SoftAffinityManager.sortedIdForExecutors.size == 6)
+ assert(
+ SoftAffinityManager.sortedIdForExecutors.equals(
+ ListBuffer[(String, String)](
+ ("6", "10.1.1.33"),
+ ("0", "192.168.22.1"),
+ ("1", "192.168.22.1"),
+ ("2", "host-2"),
+ ("5", "host-2"),
+ ("4", "host-3"))
+ ))
// all target hosts exist in computing hosts list, return the original
hosts list
generateNativePartition1()
@@ -286,19 +271,21 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
executorsListListener.onExecutorRemoved(removedEvent4)
assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
- assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
- assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+ assert(SoftAffinityManager.sortedIdForExecutors.size == 4)
+ assert(
+ SoftAffinityManager.sortedIdForExecutors.equals(
+ ListBuffer[(String, String)](
+ ("6", "10.1.1.33"),
+ ("0", "192.168.22.1"),
+ ("1", "192.168.22.1"),
+ ("5", "host-2"))
+ ))
executorsListListener.onExecutorRemoved(removedEvent2)
executorsListListener.onExecutorRemoved(removedEvent4)
assert(SoftAffinityManager.nodesExecutorsMap.size == 3)
- assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
- assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
-
- // there are only one target host existing in computing hosts list,
- // but the hash executors were removed, so return the original hosts list
- generateNativePartition4()
+ assert(SoftAffinityManager.sortedIdForExecutors.size == 4)
executorsListListener.onExecutorRemoved(removedEvent0)
executorsListListener.onExecutorRemoved(removedEvent1)
@@ -307,10 +294,29 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
executorsListListener.onExecutorRemoved(removedEvent7)
assert(SoftAffinityManager.nodesExecutorsMap.isEmpty)
- assert(SoftAffinityManager.fixedIdForExecutors.size == 6)
- assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty))
+ assert(SoftAffinityManager.sortedIdForExecutors.isEmpty)
// all executors were removed, return the original hosts list
generateNativePartition5()
+
+ executorsListListener.onExecutorAdded(addEvent0)
+ executorsListListener.onExecutorAdded(addEvent1)
+ executorsListListener.onExecutorAdded(addEvent2)
+ executorsListListener.onExecutorAdded(addEvent3)
+ executorsListListener.onExecutorAdded(addEvent4)
+ executorsListListener.onExecutorAdded(addEvent5)
+ executorsListListener.onExecutorAdded(addEvent6)
+ assert(SoftAffinityManager.sortedIdForExecutors.size == 7)
+ assert(
+ SoftAffinityManager.sortedIdForExecutors.equals(
+ ListBuffer[(String, String)](
+ ("6", "10.1.1.33"),
+ ("0", "192.168.22.1"),
+ ("1", "192.168.22.1"),
+ ("2", "host-2"),
+ ("5", "host-2"),
+ ("3", "host-3"),
+ ("4", "host-3"))
+ ))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]