This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new dfcfb4935 [CELEBORN-669] Avoid commit files on excluded worker list
dfcfb4935 is described below
commit dfcfb49352c7ca200bbde4d14fe61383dc51ec30
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jun 13 22:31:02 2023 +0800
[CELEBORN-669] Avoid commit files on excluded worker list
### What changes were proposed in this pull request?
CommitHandler will check whether the target worker is in
WorkerStatusTracker's excluded list. If so, skip calling commit files on it.
### Why are the changes needed?
Avoid unnecessary commit files to excluded worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1581 from waitinfuture/669.
Lead-authored-by: zky.zhoukeyong <[email protected]>
Co-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 47cded835f68254ea95fa9482b27f302b9ddf4af)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/client/CommitManager.scala | 6 +++--
.../celeborn/client/commit/CommitHandler.scala | 29 ++++++++++++++++------
.../client/commit/MapPartitionCommitHandler.scala | 7 +++---
.../commit/ReducePartitionCommitHandler.scala | 7 +++---
.../org/apache/celeborn/common/CelebornConf.scala | 14 +++++++++--
docs/configuration/client.md | 3 ++-
6 files changed, 48 insertions(+), 18 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 0deda78c7..6925c11bb 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -276,12 +276,14 @@ class CommitManager(appId: String, val conf:
CelebornConf, lifecycleManager: Lif
appId,
conf,
lifecycleManager.shuffleAllocatedWorkers,
- committedPartitionInfo)
+ committedPartitionInfo,
+ lifecycleManager.workerStatusTracker)
case PartitionType.MAP => new MapPartitionCommitHandler(
appId,
conf,
lifecycleManager.shuffleAllocatedWorkers,
- committedPartitionInfo)
+ committedPartitionInfo,
+ lifecycleManager.workerStatusTracker)
case _ => throw new UnsupportedOperationException(
s"Unexpected ShufflePartitionType for CommitManager:
$partitionType")
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 20c29eb32..a42c8b2af 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -24,9 +24,9 @@ import java.util.concurrent.atomic.{AtomicLong, LongAdder}
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers, ShuffleFileGroups}
-import org.apache.celeborn.client.ShuffleCommittedInfo
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo,
WorkerInfo}
@@ -46,7 +46,8 @@ case class CommitResult(
abstract class CommitHandler(
appId: String,
conf: CelebornConf,
- committedPartitionInfo: CommittedPartitionInfo) extends Logging {
+ committedPartitionInfo: CommittedPartitionInfo,
+ workerStatusTracker: WorkerStatusTracker) extends Logging {
private val pushReplicateEnabled = conf.clientPushReplicateEnabled
private val testRetryCommitFiles = conf.testRetryCommitFiles
@@ -265,15 +266,29 @@ abstract class CommitHandler(
slaveIds,
getMapperAttempts(shuffleId),
commitEpoch.incrementAndGet())
- val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+ val res =
+ if (conf.clientCommitFilesIgnoreExcludedWorkers &&
+ workerStatusTracker.blacklist.containsKey(worker)) {
+ CommitFilesResponse(
+ StatusCode.WORKER_IN_BLACKLIST,
+ List.empty.asJava,
+ List.empty.asJava,
+ masterIds,
+ slaveIds)
+ } else {
+ requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+ }
res.status match {
case StatusCode.SUCCESS => // do nothing
- case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED
| StatusCode.REQUEST_FAILED =>
- logDebug(s"Request $commitFiles return ${res.status} for " +
+ case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED
| StatusCode.REQUEST_FAILED | StatusCode.WORKER_IN_BLACKLIST =>
+ logInfo(s"Request $commitFiles return ${res.status} for " +
s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
- commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
- case _ => // won't happen
+ if (res.status != StatusCode.WORKER_IN_BLACKLIST) {
+ commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
+ }
+ case _ =>
+ logError(s"Should never reach here! commit files response status
${res.status}")
}
res
} else {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
index 4fb7ef2a7..cabbf008d 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
@@ -25,9 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers}
-import org.apache.celeborn.client.ShuffleCommittedInfo
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo,
WorkerInfo}
@@ -51,8 +51,9 @@ class MapPartitionCommitHandler(
appId: String,
conf: CelebornConf,
shuffleAllocatedWorkers: ShuffleAllocatedWorkers,
- committedPartitionInfo: CommittedPartitionInfo)
- extends CommitHandler(appId, conf, committedPartitionInfo)
+ committedPartitionInfo: CommittedPartitionInfo,
+ workerStatusTracker: WorkerStatusTracker)
+ extends CommitHandler(appId, conf, committedPartitionInfo,
workerStatusTracker)
with Logging {
private val shuffleSucceedPartitionIds = JavaUtils.newConcurrentHashMap[Int,
util.Set[Integer]]()
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 56a83ca54..18214b51c 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -26,9 +26,9 @@ import scala.collection.mutable
import com.google.common.cache.{Cache, CacheBuilder}
+import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers}
-import org.apache.celeborn.client.ShuffleCommittedInfo
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo,
WorkerInfo}
@@ -50,8 +50,9 @@ class ReducePartitionCommitHandler(
appId: String,
conf: CelebornConf,
shuffleAllocatedWorkers: ShuffleAllocatedWorkers,
- committedPartitionInfo: CommittedPartitionInfo)
- extends CommitHandler(appId, conf, committedPartitionInfo)
+ committedPartitionInfo: CommittedPartitionInfo,
+ workerStatusTracker: WorkerStatusTracker)
+ extends CommitHandler(appId, conf, committedPartitionInfo,
workerStatusTracker)
with Logging {
private val getReducerFileGroupRequest =
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 747f9dd38..da334e859 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -659,6 +659,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES)
def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT)
def clientRequestCommitFilesMaxRetries: Int =
get(CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY)
+ def clientCommitFilesIgnoreExcludedWorkers: Boolean =
get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
@@ -2480,9 +2481,10 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.worker.excluded.expireTimeout")
.categories("client")
.version("0.3.0")
- .doc("Timeout time for LifecycleManager to clear reserved excluded
worker.")
+ .doc("Timeout time for LifecycleManager to clear reserved excluded
worker. Default to be 1.5 * `celeborn.master.heartbeat.worker.timeout`" +
+ "to cover worker heartbeat timeout check period")
.timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("600s")
+ .createWithDefaultString("180s")
val CLIENT_CHECKED_USE_ALLOCATED_WORKERS: ConfigEntry[Boolean] =
buildConf("celeborn.client.checked.useAllocatedWorkers")
@@ -2986,6 +2988,14 @@ object CelebornConf extends Logging {
.checkValue(v => v > 0, "value must be positive")
.createWithDefault(2)
+ val CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.commitFiles.ignoreExcludedWorker")
+ .categories("client")
+ .version("0.3.0")
+ .doc("When true, LifecycleManager will skip workers which are in the
excluded list.")
+ .booleanConf
+ .createWithDefault(false)
+
val CLIENT_PUSH_STAGE_END_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.push.stageEnd.timeout")
.withAlternative("celeborn.push.stageEnd.timeout")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 641585a54..6ddae63ff 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -22,7 +22,8 @@ license: |
| celeborn.client.application.heartbeatInterval | 10s | Interval for client to
send heartbeat message to master. | 0.3.0 |
| celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add
partition's peer worker into blacklist when push data to slave failed. | 0.3.0
|
| celeborn.client.closeIdleConnections | true | Whether client will close idle
connections. | 0.3.0 |
-| celeborn.client.excludedWorker.expireTimeout | 600s | Timeout time for
LifecycleManager to clear reserved excluded worker. | 0.3.0 |
+| celeborn.client.commitFiles.ignoreExcludedWorker | false | When true,
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 |
+| celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for
LifecycleManager to clear reserved excluded worker. Default to be 1.5 *
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout
check period | 0.3.0 |
| celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 |
| celeborn.client.fetch.excludedWorker.expireTimeout | <value of
celeborn.client.excludedWorker.expireTimeout> | ShuffleClient is a static
object, it will be used in the whole lifecycle of Executor,We give a expire
time for blacklisted worker to avoid a transient worker issues. | 0.3.0 |
| celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch
request. | 0.3.0 |