This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new de436f715 [CELEBORN-1783] Fix Pending task in commitThreadPool wont be
canceled
de436f715 is described below
commit de436f7151ee4859e4ab62efa41aae3122086506
Author: zhengtao <[email protected]>
AuthorDate: Thu Dec 19 14:24:36 2024 +0800
[CELEBORN-1783] Fix Pending task in commitThreadPool wont be canceled
1. Cancel all commit file jobs when handleCommitFiles timeout.
2. Fix timeout commit jobs wont be set `CommitInfo.COMMIT_FINISHED`
1. Pending task in commitThreadPool wont be canceled.
3. Timeout commit jobs should set `commitInfo.status`.

No.
UT test.
The `commitInfo.status` should be `COMMIT_FINISHED` when commitFile jobs
timeout.
Cluster test.

Closes #3004 from zaynt4606/clb1783.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 67971df68fd87fb90f60b17ca70e943abb3036c7)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 8 +--
.../client/LifecycleManagerCommitFilesSuite.scala | 74 +++++++++++++++++++++-
.../service/deploy/worker/Controller.scala | 34 ++++++----
3 files changed, 96 insertions(+), 20 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ea0147d5d..203b08c1a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1092,7 +1092,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// //////////////////////////////////////////////////////
def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
def testMockDestroySlotsFailure: Boolean =
get(TEST_CLIENT_MOCK_DESTROY_SLOTS_FAILURE)
- def testMockCommitFilesFailure: Boolean =
get(TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE)
+ def testMockCommitFilesFailure: Boolean = get(TEST_MOCK_COMMIT_FILES_FAILURE)
def testPushPrimaryDataTimeout: Boolean =
get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
def testPushReplicaDataTimeout: Boolean =
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
@@ -3181,10 +3181,10 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
- val TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE: ConfigEntry[Boolean] =
- buildConf("celeborn.test.client.mockCommitFilesFailure")
+ val TEST_MOCK_COMMIT_FILES_FAILURE: ConfigEntry[Boolean] =
+ buildConf("celeborn.test.mockCommitFilesFailure")
.internal
- .categories("test", "client")
+ .categories("test", "client", "worker")
.doc("Fail commit files request for test")
.version("0.3.2")
.booleanConf
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
index 23452bafa..7ccf6c74a 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
@@ -52,7 +52,7 @@ class LifecycleManagerCommitFilesSuite extends
WithShuffleClientSuite with MiniC
test("test commit files without mocking failure") {
val shuffleId = nextShuffleId
val conf = celebornConf.clone
- conf.set(CelebornConf.TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE.key, "false")
+ conf.set(CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key, "false")
val lifecycleManager: LifecycleManager = new LifecycleManager(APP, conf)
val ids = new util.ArrayList[Integer](10)
0 until 10 foreach {
@@ -102,7 +102,7 @@ class LifecycleManagerCommitFilesSuite extends
WithShuffleClientSuite with MiniC
test("test commit files with mocking failure") {
val shuffleId = nextShuffleId
val conf = celebornConf.clone
- conf.set(CelebornConf.TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE.key, "true")
+ conf.set(CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key, "true")
val lifecycleManager: LifecycleManager = new LifecycleManager(APP, conf)
val ids = new util.ArrayList[Integer](10)
0 until 10 foreach {
@@ -149,6 +149,76 @@ class LifecycleManagerCommitFilesSuite extends
WithShuffleClientSuite with MiniC
lifecycleManager.stop()
}
+ test("test commit files with timeout failure") {
+ if (workerInfos.nonEmpty) {
+ shutdownMiniCluster()
+ }
+ celebornConf
+ .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "false")
+ val workerConf0 = Map(
+ s"${CelebornConf.WORKER_SHUFFLE_COMMIT_TIMEOUT.key}" -> "100",
+ s"${CelebornConf.WORKER_COMMIT_THREADS.key}" -> "1",
+ s"${CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key}" -> "true")
+ val (master, _) = setupMiniClusterWithRandomPorts(workerConf = workerConf0)
+ celebornConf.set(
+ CelebornConf.MASTER_ENDPOINTS.key,
+ master.conf.get(CelebornConf.MASTER_ENDPOINTS.key))
+
+ val shuffleId = nextShuffleId
+ val conf = celebornConf.clone
+ conf.set(CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key, "true")
+ val lifecycleManager: LifecycleManager = new LifecycleManager(APP, conf)
+ val ids = new util.ArrayList[Integer](1000)
+ 0 until 1000 foreach {
+ ids.add(_)
+ }
+ val res = lifecycleManager.requestMasterRequestSlotsWithRetry(shuffleId,
ids)
+ assert(res.status == StatusCode.SUCCESS)
+
+ lifecycleManager.setupEndpoints(
+ res.workerResource.keySet(),
+ shuffleId,
+ new ShuffleFailedWorkers())
+
+ lifecycleManager.reserveSlotsWithRetry(
+ shuffleId,
+ new util.HashSet(res.workerResource.keySet()),
+ res.workerResource,
+ updateEpoch = false)
+
+ lifecycleManager.commitManager.registerShuffle(shuffleId, 1, false)
+ 0 until 1000 foreach { partitionId =>
+ lifecycleManager.commitManager.finishMapperAttempt(shuffleId, 0, 0, 1,
partitionId)
+ }
+
+ val commitHandler =
lifecycleManager.commitManager.getCommitHandler(shuffleId)
+ val params = new ArrayBuffer[CommitFilesParam](res.workerResource.size())
+ res.workerResource.asScala.foreach { case (workerInfo, (primaryIds,
replicaIds)) =>
+ params += (CommitFilesParam(
+ workerInfo,
+ primaryIds.asScala.map(_.getUniqueId).toList.asJava,
+ replicaIds.asScala.map(_.getUniqueId).toList.asJava))
+ }
+ commitHandler.doParallelCommitFiles(
+ shuffleId,
+ lifecycleManager.commitManager.committedPartitionInfo.get(shuffleId),
+ params,
+ new ShuffleFailedWorkers)
+
+ workerInfos.keySet.foreach { worker =>
+ val commitInfoList =
+ worker.controller.shuffleCommitInfos.get(Utils.makeShuffleKey(APP,
shuffleId))
+ assert(worker.controller.commitThreadPool.getQueue.size() == 0)
+ if (commitInfoList != null) {
+ commitInfoList.values().asScala.foreach { commitInfo =>
+ assert(commitInfo.status == CommitInfo.COMMIT_FINISHED)
+ assert(commitInfo.response.status ==
StatusCode.COMMIT_FILE_EXCEPTION)
+ }
+ }
+ }
+ lifecycleManager.stop()
+ }
+
override def afterAll(): Unit = {
logInfo("all test complete , stop celeborn mini cluster")
shutdownMiniCluster()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 4bc8599c2..0e96944d8 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicIntegerArray, AtomicRef
import java.util.function.BiFunction
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import io.netty.util.{HashedWheelTimer, Timeout, TimerTask}
import org.roaringbitmap.RoaringBitmap
@@ -61,6 +62,7 @@ private[deploy] class Controller(
val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate
var shutdown: AtomicBoolean = _
val defaultPushdataTimeout = conf.pushDataTimeoutMs
+ val mockCommitFilesFailure = conf.testMockCommitFilesFailure
def init(worker: Worker): Unit = {
storageManager = worker.storageManager
@@ -278,9 +280,9 @@ private[deploy] class Controller(
committedStorageInfos: ConcurrentHashMap[String, StorageInfo],
committedMapIdBitMap: ConcurrentHashMap[String, RoaringBitmap],
partitionSizeList: LinkedBlockingQueue[Long],
- isPrimary: Boolean = true): CompletableFuture[Void] = {
- var future: CompletableFuture[Void] = null
-
+ isPrimary: Boolean = true)
+ : (CompletableFuture[Void], ArrayBuffer[CompletableFuture[Void]]) = {
+ val tasks = ArrayBuffer[CompletableFuture[Void]]()
if (uniqueIds != null) {
uniqueIds.asScala.foreach { uniqueId =>
val task = CompletableFuture.runAsync(
@@ -320,6 +322,9 @@ private[deploy] class Controller(
} else {
emptyFileIds.add(uniqueId)
}
+ if (mockCommitFilesFailure) {
+ Thread.sleep(10)
+ }
} catch {
case e: IOException =>
logError(s"Commit file for $shuffleKey $uniqueId failed.", e)
@@ -328,16 +333,12 @@ private[deploy] class Controller(
}
},
commitThreadPool)
-
- if (future == null) {
- future = task
- } else {
- future = CompletableFuture.allOf(future, task)
- }
+ tasks.append(task)
}
}
-
- future
+ val future: CompletableFuture[Void] =
+ if (tasks.isEmpty) null else CompletableFuture.allOf(tasks.toSeq: _*)
+ (future, tasks)
}
private def waitMapPartitionRegionFinished(fileWriter: FileWriter,
waitTimeout: Long): Unit = {
@@ -467,7 +468,7 @@ private[deploy] class Controller(
val committedMapIdBitMap = JavaUtils.newConcurrentHashMap[String,
RoaringBitmap]()
val partitionSizeList = new LinkedBlockingQueue[Long]()
- val primaryFuture =
+ val (primaryFuture, primaryTasks) =
commitFiles(
shuffleKey,
primaryIds,
@@ -477,7 +478,7 @@ private[deploy] class Controller(
committedPrimaryStorageInfos,
committedMapIdBitMap,
partitionSizeList)
- val replicaFuture = commitFiles(
+ val (replicaFuture, replicaTasks) = commitFiles(
shuffleKey,
replicaIds,
committedReplicaIds,
@@ -499,6 +500,8 @@ private[deploy] class Controller(
null
}
+ val tasks = primaryTasks ++ replicaTasks
+
def reply(): Unit = {
// release slots before reply.
val releasePrimaryLocations =
@@ -582,7 +585,10 @@ private[deploy] class Controller(
new TimerTask {
override def run(timeout: Timeout): Unit = {
if (result.get() != null) {
- result.get().cancel(true)
+ future.cancel(true)
+ tasks.foreach { task =>
+ task.cancel(true)
+ }
logWarning(s"After waiting $shuffleCommitTimeout ms, cancel all
commit file jobs.")
}
}