This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new edaf88a75 [GLUTEN-6995][Core] Limit soft affinity duplicate reading
detection max cache items (#7003)
edaf88a75 is described below
commit edaf88a75ad88a0638d0cf5cf623e936ad8c46c4
Author: Zhen Li <[email protected]>
AuthorDate: Wed Aug 28 11:10:51 2024 +0800
[GLUTEN-6995][Core] Limit soft affinity duplicate reading detection max
cache items (#7003)
* [Core] Limit soft affinity duplicate reading detection max cache items
* disable duplicate_reading by default
---
.../org/apache/gluten/affinity/CHUTAffinity.scala | 4 +-
.../gluten/softaffinity/SoftAffinityManager.scala | 46 +++++++++++++++-------
.../SoftAffinityWithRDDInfoSuite.scala | 40 ++++++++++++++++++-
.../scala/org/apache/gluten/GlutenConfig.scala | 8 ++--
4 files changed, 76 insertions(+), 22 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
index c7d77e550..d8bd31d6f 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
@@ -35,6 +35,6 @@ object CHUTSoftAffinityManager extends AffinityManager {
override lazy val detectDuplicateReading = true
- override lazy val maxDuplicateReadingRecords =
-
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
+ override lazy val duplicateReadingMaxCacheItems =
+
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
index 278e1b550..dd82807e3 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
@@ -28,7 +28,6 @@ import
org.apache.spark.sql.execution.datasources.FilePartition
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -56,18 +55,34 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
lazy val detectDuplicateReading = true
- lazy val maxDuplicateReadingRecords =
-
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
+ lazy val duplicateReadingMaxCacheItems =
+
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
// rdd id -> patition id, file path, start, length
- val rddPartitionInfoMap = new ConcurrentHashMap[Int, Array[(Int, String,
Long, Long)]]()
+ val rddPartitionInfoMap: LoadingCache[Integer, Array[(Int, String, Long,
Long)]] =
+ CacheBuilder
+ .newBuilder()
+ .maximumSize(duplicateReadingMaxCacheItems)
+ .build(new CacheLoader[Integer, Array[(Int, String, Long, Long)]] {
+ override def load(id: Integer): Array[(Int, String, Long, Long)] = {
+ Array.empty[(Int, String, Long, Long)]
+ }
+ })
// stage id -> execution id + rdd ids: job start / execution end
- val stageInfoMap = new ConcurrentHashMap[Int, Array[Int]]()
+ val stageInfoMap: LoadingCache[Integer, Array[Int]] =
+ CacheBuilder
+ .newBuilder()
+ .maximumSize(duplicateReadingMaxCacheItems)
+ .build(new CacheLoader[Integer, Array[Int]] {
+ override def load(id: Integer): Array[Int] = {
+ Array.empty[Int]
+ }
+ })
// final result: partition composed
key("path1_start_length,path2_start_length") --> array_host
val duplicateReadingInfos: LoadingCache[String, Array[(String, String)]] =
CacheBuilder
.newBuilder()
- .maximumSize(maxDuplicateReadingRecords)
+ .maximumSize(duplicateReadingMaxCacheItems)
.build(new CacheLoader[String, Array[(String, String)]] {
override def load(name: String): Array[(String, String)] = {
Array.empty[(String, String)]
@@ -162,11 +177,11 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
event.reason match {
case org.apache.spark.Success =>
val stageId = event.stageId
- val rddInfo = stageInfoMap.get(stageId)
+ val rddInfo = stageInfoMap.getIfPresent(stageId)
if (rddInfo != null) {
rddInfo.foreach {
rddId =>
- val partitions = rddPartitionInfoMap.get(rddId)
+ val partitions = rddPartitionInfoMap.getIfPresent(rddId)
if (partitions != null) {
val key = partitions
.filter(p => p._1 ==
SparkShimLoader.getSparkShims.getPartitionId(event.taskInfo))
@@ -195,11 +210,11 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
}
def clearPartitionMap(rddIds: Seq[Int]): Unit = {
- rddIds.foreach(id => rddPartitionInfoMap.remove(id))
+ rddIds.foreach(id => rddPartitionInfoMap.invalidate(id))
}
def clearStageMap(id: Int): Unit = {
- stageInfoMap.remove(id)
+ stageInfoMap.invalidate(id)
}
def checkTargetHosts(hosts: Array[String]): Boolean = {
@@ -274,8 +289,9 @@ abstract class AffinityManager extends LogLevelUtil with
Logging {
val paths =
f.files.map(file => (f.index, file.filePath.toString, file.start,
file.length)).toArray
val key = rddId
- val values = if (rddPartitionInfoMap.containsKey(key)) {
- rddPartitionInfoMap.get(key) ++ paths
+ var values = rddPartitionInfoMap.getIfPresent(key)
+ values = if (values != null) {
+ values ++ paths
} else {
paths
}
@@ -300,8 +316,8 @@ object SoftAffinityManager extends AffinityManager {
) &&
SparkShimLoader.getSparkShims.supportDuplicateReadingTracking
- override lazy val maxDuplicateReadingRecords = SparkEnv.get.conf.getInt(
- GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS,
-
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
+ override lazy val duplicateReadingMaxCacheItems = SparkEnv.get.conf.getInt(
+ GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS,
+
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
)
}
diff --git
a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index b22eb5080..2328900da 100644
---
a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++
b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.softaffinity
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.GlutenPartition
-import org.apache.gluten.softaffinity.SoftAffinityManager
+import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanBuilder
@@ -33,6 +33,16 @@ import
org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.storage.{RDDInfo, StorageLevel}
+object FakeSoftAffinityManager extends AffinityManager {
+ override lazy val usingSoftAffinity: Boolean = true
+
+ override lazy val minOnTargetHosts: Int = 1
+
+ override lazy val detectDuplicateReading = true
+
+ override lazy val duplicateReadingMaxCacheItems = 1
+}
+
class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession
with PredicateHelper {
override protected def sparkConf: SparkConf = super.sparkConf
@@ -110,4 +120,32 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with
SharedSparkSession wit
assert(SoftAffinityManager.askExecutors(filePartition).isEmpty)
}
}
+
+ test("Duplicate reading detection limits middle states count") {
+ // This test simulate the case listener bus stucks. We need to make sure
the middle states
+ // count would not exceed the configed threshold.
+ if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) {
+ val files = Seq(
+ SparkShimLoader.getSparkShims.generatePartitionedFile(
+ InternalRow.empty,
+ "fakePath0",
+ 0,
+ 100,
+ Array("host-3")),
+ SparkShimLoader.getSparkShims.generatePartitionedFile(
+ InternalRow.empty,
+ "fakePath0",
+ 100,
+ 200,
+ Array("host-3"))
+ ).toArray
+ val filePartition = FilePartition(-1, files)
+ FakeSoftAffinityManager.updatePartitionMap(filePartition, 1)
+ assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1)
+ val filePartition1 = FilePartition(-1, files)
+ FakeSoftAffinityManager.updatePartitionMap(filePartition1, 2)
+ assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1)
+ assert(FakeSoftAffinityManager.stageInfoMap.size <= 1)
+ }
+ }
}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 5c032d4b0..27ce1ec36 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -557,11 +557,11 @@ object GlutenConfig {
// Enable Soft Affinity duplicate reading detection, defalut value is true
val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED =
"spark.gluten.soft-affinity.duplicateReadingDetect.enabled"
- val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE =
true
+ val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE =
false
// Enable Soft Affinity duplicate reading detection, defalut value is 10000
- val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS =
- "spark.gluten.soft-affinity.maxDuplicateReading.records"
- val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE = 10000
+ val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS =
+ "spark.gluten.soft-affinity.duplicateReading.maxCacheItems"
+ val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE =
10000
// Pass through to native conf
val GLUTEN_SAVE_DIR = "spark.gluten.saveDir"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]