This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new edaf88a75 [GLUTEN-6995][Core] Limit soft affinity duplicate reading 
detection max cache items (#7003)
edaf88a75 is described below

commit edaf88a75ad88a0638d0cf5cf623e936ad8c46c4
Author: Zhen Li <[email protected]>
AuthorDate: Wed Aug 28 11:10:51 2024 +0800

    [GLUTEN-6995][Core] Limit soft affinity duplicate reading detection max 
cache items (#7003)
    
    * [Core] Limit soft affinity duplicate reading detection max cache items
    
    * disable duplicate_reading by default
---
 .../org/apache/gluten/affinity/CHUTAffinity.scala  |  4 +-
 .../gluten/softaffinity/SoftAffinityManager.scala  | 46 +++++++++++++++-------
 .../SoftAffinityWithRDDInfoSuite.scala             | 40 ++++++++++++++++++-
 .../scala/org/apache/gluten/GlutenConfig.scala     |  8 ++--
 4 files changed, 76 insertions(+), 22 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
index c7d77e550..d8bd31d6f 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/affinity/CHUTAffinity.scala
@@ -35,6 +35,6 @@ object CHUTSoftAffinityManager extends AffinityManager {
 
   override lazy val detectDuplicateReading = true
 
-  override lazy val maxDuplicateReadingRecords =
-    
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
+  override lazy val duplicateReadingMaxCacheItems =
+    
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
index 278e1b550..dd82807e3 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
@@ -28,7 +28,6 @@ import 
org.apache.spark.sql.execution.datasources.FilePartition
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 
-import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -56,18 +55,34 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
 
   lazy val detectDuplicateReading = true
 
-  lazy val maxDuplicateReadingRecords =
-    
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
+  lazy val duplicateReadingMaxCacheItems =
+    
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
 
   // rdd id -> patition id, file path, start, length
-  val rddPartitionInfoMap = new ConcurrentHashMap[Int, Array[(Int, String, 
Long, Long)]]()
+  val rddPartitionInfoMap: LoadingCache[Integer, Array[(Int, String, Long, 
Long)]] =
+    CacheBuilder
+      .newBuilder()
+      .maximumSize(duplicateReadingMaxCacheItems)
+      .build(new CacheLoader[Integer, Array[(Int, String, Long, Long)]] {
+        override def load(id: Integer): Array[(Int, String, Long, Long)] = {
+          Array.empty[(Int, String, Long, Long)]
+        }
+      })
   // stage id -> execution id + rdd ids: job start / execution end
-  val stageInfoMap = new ConcurrentHashMap[Int, Array[Int]]()
+  val stageInfoMap: LoadingCache[Integer, Array[Int]] =
+    CacheBuilder
+      .newBuilder()
+      .maximumSize(duplicateReadingMaxCacheItems)
+      .build(new CacheLoader[Integer, Array[Int]] {
+        override def load(id: Integer): Array[Int] = {
+          Array.empty[Int]
+        }
+      })
   // final result: partition composed 
key("path1_start_length,path2_start_length") --> array_host
   val duplicateReadingInfos: LoadingCache[String, Array[(String, String)]] =
     CacheBuilder
       .newBuilder()
-      .maximumSize(maxDuplicateReadingRecords)
+      .maximumSize(duplicateReadingMaxCacheItems)
       .build(new CacheLoader[String, Array[(String, String)]] {
         override def load(name: String): Array[(String, String)] = {
           Array.empty[(String, String)]
@@ -162,11 +177,11 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
     event.reason match {
       case org.apache.spark.Success =>
         val stageId = event.stageId
-        val rddInfo = stageInfoMap.get(stageId)
+        val rddInfo = stageInfoMap.getIfPresent(stageId)
         if (rddInfo != null) {
           rddInfo.foreach {
             rddId =>
-              val partitions = rddPartitionInfoMap.get(rddId)
+              val partitions = rddPartitionInfoMap.getIfPresent(rddId)
               if (partitions != null) {
                 val key = partitions
                   .filter(p => p._1 == 
SparkShimLoader.getSparkShims.getPartitionId(event.taskInfo))
@@ -195,11 +210,11 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
   }
 
   def clearPartitionMap(rddIds: Seq[Int]): Unit = {
-    rddIds.foreach(id => rddPartitionInfoMap.remove(id))
+    rddIds.foreach(id => rddPartitionInfoMap.invalidate(id))
   }
 
   def clearStageMap(id: Int): Unit = {
-    stageInfoMap.remove(id)
+    stageInfoMap.invalidate(id)
   }
 
   def checkTargetHosts(hosts: Array[String]): Boolean = {
@@ -274,8 +289,9 @@ abstract class AffinityManager extends LogLevelUtil with 
Logging {
     val paths =
       f.files.map(file => (f.index, file.filePath.toString, file.start, 
file.length)).toArray
     val key = rddId
-    val values = if (rddPartitionInfoMap.containsKey(key)) {
-      rddPartitionInfoMap.get(key) ++ paths
+    var values = rddPartitionInfoMap.getIfPresent(key)
+    values = if (values != null) {
+      values ++ paths
     } else {
       paths
     }
@@ -300,8 +316,8 @@ object SoftAffinityManager extends AffinityManager {
   ) &&
     SparkShimLoader.getSparkShims.supportDuplicateReadingTracking
 
-  override lazy val maxDuplicateReadingRecords = SparkEnv.get.conf.getInt(
-    GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS,
-    
GlutenConfig.GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE
+  override lazy val duplicateReadingMaxCacheItems = SparkEnv.get.conf.getInt(
+    GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS,
+    
GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE
   )
 }
diff --git 
a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
 
b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index b22eb5080..2328900da 100644
--- 
a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++ 
b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.softaffinity
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.GlutenPartition
-import org.apache.gluten.softaffinity.SoftAffinityManager
+import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
 import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.plan.PlanBuilder
@@ -33,6 +33,16 @@ import 
org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.storage.{RDDInfo, StorageLevel}
 
+object FakeSoftAffinityManager extends AffinityManager {
+  override lazy val usingSoftAffinity: Boolean = true
+
+  override lazy val minOnTargetHosts: Int = 1
+
+  override lazy val detectDuplicateReading = true
+
+  override lazy val duplicateReadingMaxCacheItems = 1
+}
+
 class SoftAffinityWithRDDInfoSuite extends QueryTest with SharedSparkSession 
with PredicateHelper {
 
   override protected def sparkConf: SparkConf = super.sparkConf
@@ -110,4 +120,32 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with 
SharedSparkSession wit
       assert(SoftAffinityManager.askExecutors(filePartition).isEmpty)
     }
   }
+
+  test("Duplicate reading detection limits middle states count") {
+    // This test simulate the case listener bus stucks. We need to make sure 
the middle states
+    // count would not exceed the configed threshold.
+    if (SparkShimLoader.getSparkShims.supportDuplicateReadingTracking) {
+      val files = Seq(
+        SparkShimLoader.getSparkShims.generatePartitionedFile(
+          InternalRow.empty,
+          "fakePath0",
+          0,
+          100,
+          Array("host-3")),
+        SparkShimLoader.getSparkShims.generatePartitionedFile(
+          InternalRow.empty,
+          "fakePath0",
+          100,
+          200,
+          Array("host-3"))
+      ).toArray
+      val filePartition = FilePartition(-1, files)
+      FakeSoftAffinityManager.updatePartitionMap(filePartition, 1)
+      assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1)
+      val filePartition1 = FilePartition(-1, files)
+      FakeSoftAffinityManager.updatePartitionMap(filePartition1, 2)
+      assert(FakeSoftAffinityManager.rddPartitionInfoMap.size == 1)
+      assert(FakeSoftAffinityManager.stageInfoMap.size <= 1)
+    }
+  }
 }
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 5c032d4b0..27ce1ec36 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -557,11 +557,11 @@ object GlutenConfig {
   // Enable Soft Affinity duplicate reading detection, defalut value is true
   val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED =
     "spark.gluten.soft-affinity.duplicateReadingDetect.enabled"
-  val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE = 
true
+  val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_DETECT_ENABLED_DEFAULT_VALUE = 
false
   // Enable Soft Affinity duplicate reading detection, defalut value is 10000
-  val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS =
-    "spark.gluten.soft-affinity.maxDuplicateReading.records"
-  val GLUTEN_SOFT_AFFINITY_MAX_DUPLICATE_READING_RECORDS_DEFAULT_VALUE = 10000
+  val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS =
+    "spark.gluten.soft-affinity.duplicateReading.maxCacheItems"
+  val GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE = 
10000
 
   // Pass through to native conf
   val GLUTEN_SAVE_DIR = "spark.gluten.saveDir"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to