This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new dfcfb4935 [CELEBORN-669] Avoid commit files on excluded worker list
dfcfb4935 is described below

commit dfcfb49352c7ca200bbde4d14fe61383dc51ec30
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jun 13 22:31:02 2023 +0800

    [CELEBORN-669] Avoid commit files on excluded worker list
    
    ### What changes were proposed in this pull request?
    CommitHandler will check whether the target worker is in 
WorkerStatusTracker's excluded list. If so, skip calling commit files on it.
    
    ### Why are the changes needed?
    Avoid unnecessary commit files to excluded worker.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1581 from waitinfuture/669.
    
    Lead-authored-by: zky.zhoukeyong <[email protected]>
    Co-authored-by: Angerszhuuuu <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 47cded835f68254ea95fa9482b27f302b9ddf4af)
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/client/CommitManager.scala |  6 +++--
 .../celeborn/client/commit/CommitHandler.scala     | 29 ++++++++++++++++------
 .../client/commit/MapPartitionCommitHandler.scala  |  7 +++---
 .../commit/ReducePartitionCommitHandler.scala      |  7 +++---
 .../org/apache/celeborn/common/CelebornConf.scala  | 14 +++++++++--
 docs/configuration/client.md                       |  3 ++-
 6 files changed, 48 insertions(+), 18 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 0deda78c7..6925c11bb 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -276,12 +276,14 @@ class CommitManager(appId: String, val conf: 
CelebornConf, lifecycleManager: Lif
                 appId,
                 conf,
                 lifecycleManager.shuffleAllocatedWorkers,
-                committedPartitionInfo)
+                committedPartitionInfo,
+                lifecycleManager.workerStatusTracker)
             case PartitionType.MAP => new MapPartitionCommitHandler(
                 appId,
                 conf,
                 lifecycleManager.shuffleAllocatedWorkers,
-                committedPartitionInfo)
+                committedPartitionInfo,
+                lifecycleManager.workerStatusTracker)
             case _ => throw new UnsupportedOperationException(
                 s"Unexpected ShufflePartitionType for CommitManager: 
$partitionType")
           }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 20c29eb32..a42c8b2af 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -24,9 +24,9 @@ import java.util.concurrent.atomic.{AtomicLong, LongAdder}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
 import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
 import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers, ShuffleFileGroups}
-import org.apache.celeborn.client.ShuffleCommittedInfo
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, 
WorkerInfo}
@@ -46,7 +46,8 @@ case class CommitResult(
 abstract class CommitHandler(
     appId: String,
     conf: CelebornConf,
-    committedPartitionInfo: CommittedPartitionInfo) extends Logging {
+    committedPartitionInfo: CommittedPartitionInfo,
+    workerStatusTracker: WorkerStatusTracker) extends Logging {
 
   private val pushReplicateEnabled = conf.clientPushReplicateEnabled
   private val testRetryCommitFiles = conf.testRetryCommitFiles
@@ -265,15 +266,29 @@ abstract class CommitHandler(
           slaveIds,
           getMapperAttempts(shuffleId),
           commitEpoch.incrementAndGet())
-        val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+        val res =
+          if (conf.clientCommitFilesIgnoreExcludedWorkers &&
+            workerStatusTracker.blacklist.containsKey(worker)) {
+            CommitFilesResponse(
+              StatusCode.WORKER_IN_BLACKLIST,
+              List.empty.asJava,
+              List.empty.asJava,
+              masterIds,
+              slaveIds)
+          } else {
+            requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+          }
 
         res.status match {
           case StatusCode.SUCCESS => // do nothing
-          case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED 
| StatusCode.REQUEST_FAILED =>
-            logDebug(s"Request $commitFiles return ${res.status} for " +
+          case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED 
| StatusCode.REQUEST_FAILED | StatusCode.WORKER_IN_BLACKLIST =>
+            logInfo(s"Request $commitFiles return ${res.status} for " +
               s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
-            commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
-          case _ => // won't happen
+            if (res.status != StatusCode.WORKER_IN_BLACKLIST) {
+              commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
+            }
+          case _ =>
+            logError(s"Should never reach here! commit files response status 
${res.status}")
         }
         res
       } else {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
index 4fb7ef2a7..cabbf008d 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
@@ -25,9 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
 import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
 import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers}
-import org.apache.celeborn.client.ShuffleCommittedInfo
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, 
WorkerInfo}
@@ -51,8 +51,9 @@ class MapPartitionCommitHandler(
     appId: String,
     conf: CelebornConf,
     shuffleAllocatedWorkers: ShuffleAllocatedWorkers,
-    committedPartitionInfo: CommittedPartitionInfo)
-  extends CommitHandler(appId, conf, committedPartitionInfo)
+    committedPartitionInfo: CommittedPartitionInfo,
+    workerStatusTracker: WorkerStatusTracker)
+  extends CommitHandler(appId, conf, committedPartitionInfo, 
workerStatusTracker)
   with Logging {
 
   private val shuffleSucceedPartitionIds = JavaUtils.newConcurrentHashMap[Int, 
util.Set[Integer]]()
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 56a83ca54..18214b51c 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -26,9 +26,9 @@ import scala.collection.mutable
 
 import com.google.common.cache.{Cache, CacheBuilder}
 
+import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
 import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
 import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers}
-import org.apache.celeborn.client.ShuffleCommittedInfo
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, 
WorkerInfo}
@@ -50,8 +50,9 @@ class ReducePartitionCommitHandler(
     appId: String,
     conf: CelebornConf,
     shuffleAllocatedWorkers: ShuffleAllocatedWorkers,
-    committedPartitionInfo: CommittedPartitionInfo)
-  extends CommitHandler(appId, conf, committedPartitionInfo)
+    committedPartitionInfo: CommittedPartitionInfo,
+    workerStatusTracker: WorkerStatusTracker)
+  extends CommitHandler(appId, conf, committedPartitionInfo, 
workerStatusTracker)
   with Logging {
 
   private val getReducerFileGroupRequest =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 747f9dd38..da334e859 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -659,6 +659,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES)
   def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT)
   def clientRequestCommitFilesMaxRetries: Int = 
get(CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY)
+  def clientCommitFilesIgnoreExcludedWorkers: Boolean = 
get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
   def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM)
   def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
   def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
@@ -2480,9 +2481,10 @@ object CelebornConf extends Logging {
       .withAlternative("celeborn.worker.excluded.expireTimeout")
       .categories("client")
       .version("0.3.0")
-      .doc("Timeout time for LifecycleManager to clear reserved excluded 
worker.")
+      .doc("Timeout time for LifecycleManager to clear reserved excluded 
worker. Default to be 1.5 * `celeborn.master.heartbeat.worker.timeout`" +
+        "to cover worker heartbeat timeout check period")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("600s")
+      .createWithDefaultString("180s")
 
   val CLIENT_CHECKED_USE_ALLOCATED_WORKERS: ConfigEntry[Boolean] =
     buildConf("celeborn.client.checked.useAllocatedWorkers")
@@ -2986,6 +2988,14 @@ object CelebornConf extends Logging {
       .checkValue(v => v > 0, "value must be positive")
       .createWithDefault(2)
 
+  val CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.commitFiles.ignoreExcludedWorker")
+      .categories("client")
+      .version("0.3.0")
+      .doc("When true, LifecycleManager will skip workers which are in the 
excluded list.")
+      .booleanConf
+      .createWithDefault(false)
+
   val CLIENT_PUSH_STAGE_END_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.client.push.stageEnd.timeout")
       .withAlternative("celeborn.push.stageEnd.timeout")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 641585a54..6ddae63ff 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -22,7 +22,8 @@ license: |
 | celeborn.client.application.heartbeatInterval | 10s | Interval for client to 
send heartbeat message to master. | 0.3.0 | 
 | celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add 
partition's peer worker into blacklist when push data to slave failed. | 0.3.0 
| 
 | celeborn.client.closeIdleConnections | true | Whether client will close idle 
connections. | 0.3.0 | 
-| celeborn.client.excludedWorker.expireTimeout | 600s | Timeout time for 
LifecycleManager to clear reserved excluded worker. | 0.3.0 | 
+| celeborn.client.commitFiles.ignoreExcludedWorker | false | When true, 
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | 
+| celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for 
LifecycleManager to clear reserved excluded worker. Default to be 1.5 * 
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout 
check period | 0.3.0 | 
 | celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to 
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 | 
 | celeborn.client.fetch.excludedWorker.expireTimeout | &lt;value of 
celeborn.client.excludedWorker.expireTimeout&gt; | ShuffleClient is a static 
object, it will be used in the whole lifecycle of Executor,We give a expire 
time for blacklisted worker to avoid a transient worker issues. | 0.3.0 | 
 | celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch 
request. | 0.3.0 | 

Reply via email to