This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.2 by this push:
new 5957afbf [CELEBORN-192][IMPROVEMENT] Change FAILED status to
REQUEST_FAILED since it's all used when RPC request failed. (#1139)
5957afbf is described below
commit 5957afbf5ed72832b27ccdb75d4372b1b32c902c
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jan 6 16:53:04 2023 +0800
[CELEBORN-192][IMPROVEMENT] Change FAILED status to REQUEST_FAILED since
it's all used when RPC request failed. (#1139)
---
.../apache/celeborn/client/ShuffleClientImpl.java | 2 +-
.../apache/celeborn/client/LifecycleManager.scala | 16 +-
.../celeborn/client/commit/CommitHandler.scala | 486 +++++++++++++++++++++
.../common/protocol/message/StatusCode.java | 2 +-
.../org/apache/celeborn/common/util/Utils.scala | 2 +-
.../celeborn/service/deploy/master/Master.scala | 2 +-
6 files changed, 498 insertions(+), 12 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 2f740d3c..e32dd1d8 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -325,7 +325,7 @@ public class ShuffleClientImpl extends ShuffleClient {
} else {
logger.error(
"LifecycleManager request slots return {}, retry again, remain
retry times {}",
- StatusCode.FAILED,
+ StatusCode.REQUEST_FAILED,
numRetries - 1);
}
} catch (Exception e) {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 322806a9..5bee87dc 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -552,9 +552,9 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
val res = requestSlotsWithRetry(applicationId, shuffleId, ids)
res.status match {
- case StatusCode.FAILED =>
+ case StatusCode.REQUEST_FAILED =>
logError(s"OfferSlots RPC request failed for $shuffleId!")
- reply(RegisterShuffleResponse(StatusCode.FAILED, Array.empty))
+ reply(RegisterShuffleResponse(StatusCode.REQUEST_FAILED, Array.empty))
return
case StatusCode.SLOT_NOT_AVAILABLE =>
logError(s"OfferSlots for $shuffleId failed!")
@@ -1570,7 +1570,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync RegisterShuffle for $shuffleKey failed.", e)
- RequestSlotsResponse(StatusCode.FAILED, new WorkerResource())
+ RequestSlotsResponse(StatusCode.REQUEST_FAILED, new WorkerResource())
}
}
@@ -1585,7 +1585,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
val msg = s"Exception when askSync ReserveSlots for $shuffleKey " +
s"on worker $endpoint."
logError(msg, e)
- ReserveSlotsResponse(StatusCode.FAILED, msg + s" ${e.getMessage}")
+ ReserveSlotsResponse(StatusCode.REQUEST_FAILED, msg + s"
${e.getMessage}")
}
}
@@ -1595,7 +1595,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync Destroy for ${message.shuffleKey} failed.", e)
- DestroyResponse(StatusCode.FAILED, message.masterLocations,
message.slaveLocations)
+ DestroyResponse(StatusCode.REQUEST_FAILED, message.masterLocations,
message.slaveLocations)
}
}
@@ -1638,7 +1638,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
- ReleaseSlotsResponse(StatusCode.FAILED)
+ ReleaseSlotsResponse(StatusCode.REQUEST_FAILED)
}
}
@@ -1652,7 +1652,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync UnregisterShuffle for ${message.getShuffleId}
failed.", e)
- UnregisterShuffleResponse(StatusCode.FAILED)
+ UnregisterShuffleResponse(StatusCode.REQUEST_FAILED)
}
}
@@ -1664,7 +1664,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync GetBlacklist failed.", e)
- GetBlacklistResponse(StatusCode.FAILED, List.empty.asJava,
List.empty.asJava)
+ GetBlacklistResponse(StatusCode.REQUEST_FAILED, List.empty.asJava,
List.empty.asJava)
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
new file mode 100644
index 00000000..e9ace39c
--- /dev/null
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.commit
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.concurrent.{Callable, ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.{AtomicLong, LongAdder}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.common.cache.{Cache, CacheBuilder}
+
+import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
+import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers, ShuffleFileGroups}
+import org.apache.celeborn.client.ShuffleCommittedInfo
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.{PartitionLocationInfo, WorkerInfo}
+import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType}
+import
org.apache.celeborn.common.protocol.message.ControlMessages.{CommitFiles,
CommitFilesResponse, GetReducerFileGroupResponse}
+import org.apache.celeborn.common.protocol.message.StatusCode
+import org.apache.celeborn.common.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext,
RemoteNettyRpcCallContext}
+import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
+
+case class CommitResult(
+ masterPartitionLocationMap: ConcurrentHashMap[String, PartitionLocation],
+ slavePartitionLocationMap: ConcurrentHashMap[String, PartitionLocation],
+ commitFilesFailedWorkers: ShuffleFailedWorkers)
+
+abstract class CommitHandler(
+ appId: String,
+ conf: CelebornConf,
+ allocatedWorkers: ShuffleAllocatedWorkers,
+ committedPartitionInfo: CommittedPartitionInfo) extends Logging {
+
+ private val pushReplicateEnabled = conf.pushReplicateEnabled
+ private val testRetryCommitFiles = conf.testRetryCommitFiles
+ private val rpcCacheSize = conf.rpcCacheSize
+ private val rpcCacheConcurrencyLevel = conf.rpcCacheConcurrencyLevel
+ private val rpcCacheExpireTime = conf.rpcCacheExpireTime
+
+ private val commitEpoch = new AtomicLong()
+ private val totalWritten = new LongAdder
+ private val fileCount = new LongAdder
+ private val reducerFileGroupsMap = new ShuffleFileGroups
+ // noinspection UnstableApiUsage
+ private val getReducerFileGroupRpcCache: Cache[Int, ByteBuffer] =
CacheBuilder.newBuilder()
+ .concurrencyLevel(rpcCacheConcurrencyLevel)
+ .expireAfterWrite(rpcCacheExpireTime, TimeUnit.MILLISECONDS)
+ .maximumSize(rpcCacheSize)
+ .build().asInstanceOf[Cache[Int, ByteBuffer]]
+
+ def getPartitionType(): PartitionType
+
+ def isStageEnd(shuffleId: Int): Boolean = false
+
+ def isStageEndOrInProcess(shuffleId: Int): Boolean = false
+
+ def isStageDataLost(shuffleId: Int): Boolean = false
+
+ def setStageEnd(shuffleId: Int): Unit
+
+ /**
+ * return (waitStage isTimeOut, waitTime)
+ */
+ def waitStageEnd(shuffleId: Int): (Boolean, Long) = (true, 0)
+
+ def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = false
+
+ def batchUnHandledRequests(shuffleId: Int, shuffleCommittedInfo:
ShuffleCommittedInfo)
+ : Map[WorkerInfo, collection.Set[PartitionLocation]] = {
+ // When running to here, if handleStageEnd got lock first and commitFiles,
+ // then this batch get this lock, commitPartitionRequests may contains
+ // partitions which are already committed by stageEnd process.
+ // But inProcessStageEndShuffleSet should have contain this shuffle id,
+ // can directly return empty.
+ if (this.isStageEndOrInProcess(shuffleId)) {
+ logWarning(s"Shuffle $shuffleId ended or during processing stage end.")
+ shuffleCommittedInfo.unHandledPartitionLocations.clear()
+ Map.empty[WorkerInfo, Set[PartitionLocation]]
+ } else {
+ val currentBatch = this.getUnHandledPartitionLocations(shuffleId,
shuffleCommittedInfo)
+ shuffleCommittedInfo.unHandledPartitionLocations.clear()
+ currentBatch.foreach { partitionLocation =>
+ shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation)
+ if (partitionLocation.getPeer != null) {
+
shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation.getPeer)
+ }
+ }
+
+ if (currentBatch.nonEmpty) {
+ logWarning(s"Commit current batch HARD_SPLIT partitions for
$shuffleId: " +
+ s"${currentBatch.map(_.getUniqueId).mkString("[", ",", "]")}")
+ val workerToRequests = currentBatch.flatMap { partitionLocation =>
+ if (partitionLocation.getPeer != null) {
+ Seq(partitionLocation, partitionLocation.getPeer)
+ } else {
+ Seq(partitionLocation)
+ }
+ }.groupBy(_.getWorker)
+ workerToRequests
+ } else {
+ Map.empty[WorkerInfo, Set[PartitionLocation]]
+ }
+ }
+ }
+
+ protected def getUnHandledPartitionLocations(
+ shuffleId: Int,
+ shuffleCommittedInfo: ShuffleCommittedInfo):
mutable.Set[PartitionLocation]
+
+ def incrementInFlightNum(
+ shuffleCommittedInfo: ShuffleCommittedInfo,
+ workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]]):
Unit = {
+
shuffleCommittedInfo.allInFlightCommitRequestNum.addAndGet(workerToRequests.size)
+ }
+
+ def decrementInFlightNum(
+ shuffleCommittedInfo: ShuffleCommittedInfo,
+ workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]]):
Unit = {
+
shuffleCommittedInfo.allInFlightCommitRequestNum.addAndGet(-workerToRequests.size)
+ }
+
+ /**
+ * when someone calls tryFinalCommit, the function will return true if there
is no one ever do final commit before,
+ * otherwise it will return false.
+ *
+ * @return
+ */
+ def tryFinalCommit(
+ shuffleId: Int,
+ recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean
+
+ def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: Int): Unit
= {
+ if (isStageDataLost(shuffleId)) {
+ context.reply(
+ GetReducerFileGroupResponse(
+ StatusCode.SHUFFLE_DATA_LOST,
+ new ConcurrentHashMap(),
+ Array.empty))
+ } else {
+ if (context.isInstanceOf[LocalNettyRpcCallContext]) {
+ // This branch is for the UTs
+ context.reply(GetReducerFileGroupResponse(
+ StatusCode.SUCCESS,
+ reducerFileGroupsMap.getOrDefault(shuffleId, new
ConcurrentHashMap()),
+ getMapperAttempts(shuffleId)))
+ } else {
+ val cachedMsg = getReducerFileGroupRpcCache.get(
+ shuffleId,
+ new Callable[ByteBuffer]() {
+ override def call(): ByteBuffer = {
+ val returnedMsg = GetReducerFileGroupResponse(
+ StatusCode.SUCCESS,
+ reducerFileGroupsMap.getOrDefault(shuffleId, new
ConcurrentHashMap()),
+ getMapperAttempts(shuffleId))
+
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
+ }
+ })
+
context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
+ }
+ }
+ }
+
+ def removeExpiredShuffle(shuffleId: Int): Unit = {
+ reducerFileGroupsMap.remove(shuffleId)
+ }
+
+ /**
+ * For reduce partition if shuffle registered and corresponding map
finished, reply true.
+ * For map partition would always return false, as one mapper attempt
finished don't mean mapper ended.
+ */
+ def isMapperEnded(shuffleId: Int, mapId: Int): Boolean = false
+
+ def getMapperAttempts(shuffleId: Int): Array[Int]
+
+ /**
+ * return (thisMapperAttemptedFinishedSuccessOrNot, allMapperFinishedOrNot)
+ */
+ def finishMapperAttempt(
+ shuffleId: Int,
+ mapId: Int,
+ attemptId: Int,
+ numMappers: Int,
+ partitionId: Int,
+ recordWorkerFailure: ShuffleFailedWorkers => Unit): (Boolean, Boolean)
+
+ def registerShuffle(shuffleId: Int, numMappers: Int): Unit = {
+ reducerFileGroupsMap.put(shuffleId, new ConcurrentHashMap())
+ }
+
+ def parallelCommitFiles(
+ shuffleId: Int,
+ allocatedWorkers: util.Map[WorkerInfo, PartitionLocationInfo],
+ partitionIdOpt: Option[Int] = None): CommitResult = {
+ val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
+ val masterPartMap = new ConcurrentHashMap[String, PartitionLocation]
+ val slavePartMap = new ConcurrentHashMap[String, PartitionLocation]
+ val commitFilesFailedWorkers = new ShuffleFailedWorkers()
+ val commitFileStartTime = System.nanoTime()
+ val parallelism = Math.min(allocatedWorkers.size(), conf.rpcMaxParallelism)
+ ThreadUtils.parmap(
+ allocatedWorkers.asScala.to,
+ "CommitFiles",
+ parallelism) { case (worker, partitionLocationInfo) =>
+ if (partitionLocationInfo.containsShuffle(shuffleId.toString)) {
+ val masterParts =
+ partitionLocationInfo.getMasterLocations(shuffleId.toString,
partitionIdOpt)
+ val slaveParts =
partitionLocationInfo.getSlaveLocations(shuffleId.toString, partitionIdOpt)
+ masterParts.asScala.foreach { p =>
+ val partition = new PartitionLocation(p)
+ partition.setFetchPort(worker.fetchPort)
+ partition.setPeer(null)
+ masterPartMap.put(partition.getUniqueId, partition)
+ }
+ slaveParts.asScala.foreach { p =>
+ val partition = new PartitionLocation(p)
+ partition.setFetchPort(worker.fetchPort)
+ partition.setPeer(null)
+ slavePartMap.put(partition.getUniqueId, partition)
+ }
+
+ val (masterIds, slaveIds) = shuffleCommittedInfo.synchronized {
+ (
+ masterParts.asScala
+
.filterNot(shuffleCommittedInfo.handledPartitionLocations.contains)
+ .map(_.getUniqueId).asJava,
+ slaveParts.asScala
+
.filterNot(shuffleCommittedInfo.handledPartitionLocations.contains)
+ .map(_.getUniqueId).asJava)
+ }
+
+ commitFiles(
+ appId,
+ shuffleId,
+ shuffleCommittedInfo,
+ worker,
+ masterIds,
+ slaveIds,
+ commitFilesFailedWorkers)
+ }
+ }
+
+ logInfo(s"Shuffle $shuffleId " +
+ s"commit files complete. File count
${shuffleCommittedInfo.currentShuffleFileCount.sum()} " +
+ s"using ${(System.nanoTime() - commitFileStartTime) / 1000000} ms")
+
+ CommitResult(masterPartMap, slavePartMap, commitFilesFailedWorkers)
+ }
+
+ def commitFiles(
+ applicationId: String,
+ shuffleId: Int,
+ shuffleCommittedInfo: ShuffleCommittedInfo,
+ worker: WorkerInfo,
+ masterIds: util.List[String],
+ slaveIds: util.List[String],
+ commitFilesFailedWorkers: ShuffleFailedWorkers): Unit = {
+
+ val res =
+ if (!testRetryCommitFiles) {
+ val commitFiles = CommitFiles(
+ applicationId,
+ shuffleId,
+ masterIds,
+ slaveIds,
+ getMapperAttempts(shuffleId),
+ commitEpoch.incrementAndGet())
+ val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+
+ res.status match {
+ case StatusCode.SUCCESS => // do nothing
+ case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED
| StatusCode.REQUEST_FAILED =>
+ logDebug(s"Request $commitFiles return ${res.status} for " +
+ s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
+ commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
+ case _ => // won't happen
+ }
+ res
+ } else {
+ // for test
+ val commitFiles1 = CommitFiles(
+ applicationId,
+ shuffleId,
+ masterIds.subList(0, masterIds.size() / 2),
+ slaveIds.subList(0, slaveIds.size() / 2),
+ getMapperAttempts(shuffleId),
+ commitEpoch.incrementAndGet())
+ val res1 = requestCommitFilesWithRetry(worker.endpoint, commitFiles1)
+
+ val commitFiles = CommitFiles(
+ applicationId,
+ shuffleId,
+ masterIds.subList(masterIds.size() / 2, masterIds.size()),
+ slaveIds.subList(slaveIds.size() / 2, slaveIds.size()),
+ getMapperAttempts(shuffleId),
+ commitEpoch.incrementAndGet())
+ val res2 = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+
+
res1.committedMasterStorageInfos.putAll(res2.committedMasterStorageInfos)
+ res1.committedSlaveStorageInfos.putAll(res2.committedSlaveStorageInfos)
+ res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
+ CommitFilesResponse(
+ status = if (res1.status == StatusCode.SUCCESS) res2.status else
res1.status,
+ (res1.committedMasterIds.asScala ++
res2.committedMasterIds.asScala).toList.asJava,
+ (res1.committedSlaveIds.asScala ++
res1.committedSlaveIds.asScala).toList.asJava,
+ (res1.failedMasterIds.asScala ++
res1.failedMasterIds.asScala).toList.asJava,
+ (res1.failedSlaveIds.asScala ++
res2.failedSlaveIds.asScala).toList.asJava,
+ res1.committedMasterStorageInfos,
+ res1.committedSlaveStorageInfos,
+ res1.committedMapIdBitMap,
+ res1.totalWritten + res2.totalWritten,
+ res1.fileCount + res2.fileCount)
+ }
+
+ shuffleCommittedInfo.synchronized {
+ // record committed partitionIds
+ res.committedMasterIds.asScala.foreach({
+ case commitMasterId =>
+ val partitionUniqueIdList =
shuffleCommittedInfo.committedMasterIds.computeIfAbsent(
+ Utils.splitPartitionLocationUniqueId(commitMasterId)._1,
+ (k: Int) => new util.ArrayList[String]())
+ partitionUniqueIdList.add(commitMasterId)
+ })
+
+ res.committedSlaveIds.asScala.foreach({
+ case commitSlaveId =>
+ val partitionUniqueIdList =
shuffleCommittedInfo.committedSlaveIds.computeIfAbsent(
+ Utils.splitPartitionLocationUniqueId(commitSlaveId)._1,
+ (k: Int) => new util.ArrayList[String]())
+ partitionUniqueIdList.add(commitSlaveId)
+ })
+
+ // record committed partitions storage hint and disk hint
+
shuffleCommittedInfo.committedMasterStorageInfos.putAll(res.committedMasterStorageInfos)
+
shuffleCommittedInfo.committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos)
+
+ // record failed partitions
+ shuffleCommittedInfo.failedMasterPartitionIds.putAll(
+ res.failedMasterIds.asScala.map((_, worker)).toMap.asJava)
+ shuffleCommittedInfo.failedSlavePartitionIds.putAll(
+ res.failedSlaveIds.asScala.map((_, worker)).toMap.asJava)
+
+
shuffleCommittedInfo.committedMapIdBitmap.putAll(res.committedMapIdBitMap)
+
+ totalWritten.add(res.totalWritten)
+ fileCount.add(res.fileCount)
+ shuffleCommittedInfo.currentShuffleFileCount.add(res.fileCount)
+ }
+ }
+
+ def collectResult(
+ shuffleId: Int,
+ shuffleCommittedInfo: ShuffleCommittedInfo,
+ masterPartitionUniqueIds: util.Iterator[String],
+ slavePartitionUniqueIds: util.Iterator[String],
+ masterPartMap: ConcurrentHashMap[String, PartitionLocation],
+ slavePartMap: ConcurrentHashMap[String, PartitionLocation]): Unit = {
+ val committedPartitions = new util.HashMap[String, PartitionLocation]
+ masterPartitionUniqueIds.asScala.foreach { id =>
+ masterPartMap.get(id).setStorageInfo(
+ shuffleCommittedInfo.committedMasterStorageInfos.get(id))
+
masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+ committedPartitions.put(id, masterPartMap.get(id))
+ }
+
+ slavePartitionUniqueIds.asScala.foreach { id =>
+ val slavePartition = slavePartMap.get(id)
+
slavePartition.setStorageInfo(shuffleCommittedInfo.committedSlaveStorageInfos.get(id))
+ val masterPartition = committedPartitions.get(id)
+ if (masterPartition ne null) {
+ masterPartition.setPeer(slavePartition)
+ slavePartition.setPeer(masterPartition)
+ } else {
+ logInfo(s"Shuffle $shuffleId partition $id: master lost, " +
+ s"use slave $slavePartition.")
+
slavePartition.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+ committedPartitions.put(id, slavePartition)
+ }
+ }
+
+ committedPartitions.values().asScala.foreach { partition =>
+ val partitionLocations =
reducerFileGroupsMap.get(shuffleId).computeIfAbsent(
+ partition.getId,
+ (k: Integer) => new util.HashSet[PartitionLocation]())
+ partitionLocations.add(partition)
+ }
+ }
+
+ private def requestCommitFilesWithRetry(
+ endpoint: RpcEndpointRef,
+ message: CommitFiles): CommitFilesResponse = {
+ val maxRetries = conf.requestCommitFilesMaxRetries
+ var retryTimes = 0
+ while (retryTimes < maxRetries) {
+ try {
+ if (testRetryCommitFiles && retryTimes < maxRetries - 1) {
+ endpoint.ask[CommitFilesResponse](message)
+ Thread.sleep(1000)
+ throw new Exception("Mock fail for CommitFiles")
+ } else {
+ return endpoint.askSync[CommitFilesResponse](message)
+ }
+ } catch {
+ case e: Throwable =>
+ retryTimes += 1
+ logError(
+ s"AskSync CommitFiles for ${message.shuffleId} failed (attempt
$retryTimes/$maxRetries).",
+ e)
+ }
+ }
+
+ CommitFilesResponse(
+ StatusCode.REQUEST_FAILED,
+ List.empty.asJava,
+ List.empty.asJava,
+ message.masterIds,
+ message.slaveIds)
+ }
+
+ def checkDataLost(
+ shuffleId: Int,
+ masterPartitionUniqueIdMap: util.Map[String, WorkerInfo],
+ slavePartitionUniqueIdMap: util.Map[String, WorkerInfo]): Boolean = {
+ val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
+ if (!pushReplicateEnabled && masterPartitionUniqueIdMap.size() != 0) {
+ val msg =
+ masterPartitionUniqueIdMap.asScala.map {
+ case (partitionUniqueId, workerInfo) =>
+ s"Lost partition $partitionUniqueId in worker
[${workerInfo.readableAddress()}]"
+ }.mkString("\n")
+ logError(
+ s"""
+ |For shuffle $shuffleKey partition data lost:
+ |$msg
+ |""".stripMargin)
+ true
+ } else {
+ val failedBothPartitionIdsToWorker =
masterPartitionUniqueIdMap.asScala.flatMap {
+ case (partitionUniqueId, worker) =>
+ if (slavePartitionUniqueIdMap.asScala.contains(partitionUniqueId)) {
+ Some(partitionUniqueId -> (worker,
slavePartitionUniqueIdMap.get(partitionUniqueId)))
+ } else {
+ None
+ }
+ }
+ if (failedBothPartitionIdsToWorker.nonEmpty) {
+ val msg = failedBothPartitionIdsToWorker.map {
+ case (partitionUniqueId, (masterWorker, slaveWorker)) =>
+ s"Lost partition $partitionUniqueId " +
+ s"in master worker [${masterWorker.readableAddress()}] and slave
worker [$slaveWorker]"
+ }.mkString("\n")
+ logError(
+ s"""
+ |For shuffle $shuffleKey partition data lost:
+ |$msg
+ |""".stripMargin)
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ def commitMetrics(): (Long, Long) = (totalWritten.sumThenReset(),
fileCount.sumThenReset())
+}
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 0fb46765..75068a87 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -21,7 +21,7 @@ public enum StatusCode {
// 1/0 Status
SUCCESS(0),
PARTIAL_SUCCESS(1),
- FAILED(2),
+ REQUEST_FAILED(2),
// Specific Status
SHUFFLE_ALREADY_REGISTERED(3),
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 09b68947..ac6d5f05 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -837,7 +837,7 @@ object Utils extends Logging {
case 1 =>
StatusCode.PARTIAL_SUCCESS
case 2 =>
- StatusCode.FAILED
+ StatusCode.REQUEST_FAILED
case 3 =>
StatusCode.SHUFFLE_ALREADY_REGISTERED
case 4 =>
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a930c98f..ba30c934 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -717,7 +717,7 @@ private[celeborn] class Master(
new util.HashMap[String, DiskInfo](),
new ConcurrentHashMap[UserIdentifier, ResourceConsumption](),
null))
- GetWorkerInfosResponse(StatusCode.FAILED, result.asScala: _*)
+ GetWorkerInfosResponse(StatusCode.REQUEST_FAILED, result.asScala: _*)
}
}