This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.2 by this push:
     new 5957afbf [CELEBORN-192][IMPROVEMENT] Change FAILED status to 
REQUEST_FAILED since it's all used when RPC request failed. (#1139)
5957afbf is described below

commit 5957afbf5ed72832b27ccdb75d4372b1b32c902c
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jan 6 16:53:04 2023 +0800

    [CELEBORN-192][IMPROVEMENT] Change FAILED status to REQUEST_FAILED since 
it's all used when RPC request failed. (#1139)
---
 .../apache/celeborn/client/ShuffleClientImpl.java  |   2 +-
 .../apache/celeborn/client/LifecycleManager.scala  |  16 +-
 .../celeborn/client/commit/CommitHandler.scala     | 486 +++++++++++++++++++++
 .../common/protocol/message/StatusCode.java        |   2 +-
 .../org/apache/celeborn/common/util/Utils.scala    |   2 +-
 .../celeborn/service/deploy/master/Master.scala    |   2 +-
 6 files changed, 498 insertions(+), 12 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 2f740d3c..e32dd1d8 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -325,7 +325,7 @@ public class ShuffleClientImpl extends ShuffleClient {
         } else {
           logger.error(
               "LifecycleManager request slots return {}, retry again, remain 
retry times {}",
-              StatusCode.FAILED,
+              StatusCode.REQUEST_FAILED,
               numRetries - 1);
         }
       } catch (Exception e) {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 322806a9..5bee87dc 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -552,9 +552,9 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     val res = requestSlotsWithRetry(applicationId, shuffleId, ids)
 
     res.status match {
-      case StatusCode.FAILED =>
+      case StatusCode.REQUEST_FAILED =>
         logError(s"OfferSlots RPC request failed for $shuffleId!")
-        reply(RegisterShuffleResponse(StatusCode.FAILED, Array.empty))
+        reply(RegisterShuffleResponse(StatusCode.REQUEST_FAILED, Array.empty))
         return
       case StatusCode.SLOT_NOT_AVAILABLE =>
         logError(s"OfferSlots for $shuffleId failed!")
@@ -1570,7 +1570,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     } catch {
       case e: Exception =>
         logError(s"AskSync RegisterShuffle for $shuffleKey failed.", e)
-        RequestSlotsResponse(StatusCode.FAILED, new WorkerResource())
+        RequestSlotsResponse(StatusCode.REQUEST_FAILED, new WorkerResource())
     }
   }
 
@@ -1585,7 +1585,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
         val msg = s"Exception when askSync ReserveSlots for $shuffleKey " +
           s"on worker $endpoint."
         logError(msg, e)
-        ReserveSlotsResponse(StatusCode.FAILED, msg + s" ${e.getMessage}")
+        ReserveSlotsResponse(StatusCode.REQUEST_FAILED, msg + s" 
${e.getMessage}")
     }
   }
 
@@ -1595,7 +1595,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     } catch {
       case e: Exception =>
         logError(s"AskSync Destroy for ${message.shuffleKey} failed.", e)
-        DestroyResponse(StatusCode.FAILED, message.masterLocations, 
message.slaveLocations)
+        DestroyResponse(StatusCode.REQUEST_FAILED, message.masterLocations, 
message.slaveLocations)
     }
   }
 
@@ -1638,7 +1638,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     } catch {
       case e: Exception =>
         logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
-        ReleaseSlotsResponse(StatusCode.FAILED)
+        ReleaseSlotsResponse(StatusCode.REQUEST_FAILED)
     }
   }
 
@@ -1652,7 +1652,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     } catch {
       case e: Exception =>
         logError(s"AskSync UnregisterShuffle for ${message.getShuffleId} 
failed.", e)
-        UnregisterShuffleResponse(StatusCode.FAILED)
+        UnregisterShuffleResponse(StatusCode.REQUEST_FAILED)
     }
   }
 
@@ -1664,7 +1664,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     } catch {
       case e: Exception =>
         logError(s"AskSync GetBlacklist failed.", e)
-        GetBlacklistResponse(StatusCode.FAILED, List.empty.asJava, 
List.empty.asJava)
+        GetBlacklistResponse(StatusCode.REQUEST_FAILED, List.empty.asJava, 
List.empty.asJava)
     }
   }
 
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
new file mode 100644
index 00000000..e9ace39c
--- /dev/null
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.commit
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.concurrent.{Callable, ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.{AtomicLong, LongAdder}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.common.cache.{Cache, CacheBuilder}
+
+import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
+import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers, ShuffleFileGroups}
+import org.apache.celeborn.client.ShuffleCommittedInfo
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.meta.{PartitionLocationInfo, WorkerInfo}
+import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType}
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.{CommitFiles, 
CommitFilesResponse, GetReducerFileGroupResponse}
+import org.apache.celeborn.common.protocol.message.StatusCode
+import org.apache.celeborn.common.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, 
RemoteNettyRpcCallContext}
+import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+// Can Remove this if celeborn don't support scala211 in future
+import org.apache.celeborn.common.util.FunctionConverter._
+
+case class CommitResult(
+    masterPartitionLocationMap: ConcurrentHashMap[String, PartitionLocation],
+    slavePartitionLocationMap: ConcurrentHashMap[String, PartitionLocation],
+    commitFilesFailedWorkers: ShuffleFailedWorkers)
+
+abstract class CommitHandler(
+    appId: String,
+    conf: CelebornConf,
+    allocatedWorkers: ShuffleAllocatedWorkers,
+    committedPartitionInfo: CommittedPartitionInfo) extends Logging {
+
+  private val pushReplicateEnabled = conf.pushReplicateEnabled
+  private val testRetryCommitFiles = conf.testRetryCommitFiles
+  private val rpcCacheSize = conf.rpcCacheSize
+  private val rpcCacheConcurrencyLevel = conf.rpcCacheConcurrencyLevel
+  private val rpcCacheExpireTime = conf.rpcCacheExpireTime
+
+  private val commitEpoch = new AtomicLong()
+  private val totalWritten = new LongAdder
+  private val fileCount = new LongAdder
+  private val reducerFileGroupsMap = new ShuffleFileGroups
+  // noinspection UnstableApiUsage
+  private val getReducerFileGroupRpcCache: Cache[Int, ByteBuffer] = 
CacheBuilder.newBuilder()
+    .concurrencyLevel(rpcCacheConcurrencyLevel)
+    .expireAfterWrite(rpcCacheExpireTime, TimeUnit.MILLISECONDS)
+    .maximumSize(rpcCacheSize)
+    .build().asInstanceOf[Cache[Int, ByteBuffer]]
+
+  def getPartitionType(): PartitionType
+
+  def isStageEnd(shuffleId: Int): Boolean = false
+
+  def isStageEndOrInProcess(shuffleId: Int): Boolean = false
+
+  def isStageDataLost(shuffleId: Int): Boolean = false
+
+  def setStageEnd(shuffleId: Int): Unit
+
+  /**
+   * return (waitStage isTimeOut, waitTime)
+   */
+  def waitStageEnd(shuffleId: Int): (Boolean, Long) = (true, 0)
+
+  def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = false
+
+  def batchUnHandledRequests(shuffleId: Int, shuffleCommittedInfo: 
ShuffleCommittedInfo)
+      : Map[WorkerInfo, collection.Set[PartitionLocation]] = {
+    // When running to here, if handleStageEnd got lock first and commitFiles,
+    // then this batch get this lock, commitPartitionRequests may contains
+    // partitions which are already committed by stageEnd process.
+    // But inProcessStageEndShuffleSet should have contain this shuffle id,
+    // can directly return empty.
+    if (this.isStageEndOrInProcess(shuffleId)) {
+      logWarning(s"Shuffle $shuffleId ended or during processing stage end.")
+      shuffleCommittedInfo.unHandledPartitionLocations.clear()
+      Map.empty[WorkerInfo, Set[PartitionLocation]]
+    } else {
+      val currentBatch = this.getUnHandledPartitionLocations(shuffleId, 
shuffleCommittedInfo)
+      shuffleCommittedInfo.unHandledPartitionLocations.clear()
+      currentBatch.foreach { partitionLocation =>
+        shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation)
+        if (partitionLocation.getPeer != null) {
+          
shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation.getPeer)
+        }
+      }
+
+      if (currentBatch.nonEmpty) {
+        logWarning(s"Commit current batch HARD_SPLIT partitions for 
$shuffleId: " +
+          s"${currentBatch.map(_.getUniqueId).mkString("[", ",", "]")}")
+        val workerToRequests = currentBatch.flatMap { partitionLocation =>
+          if (partitionLocation.getPeer != null) {
+            Seq(partitionLocation, partitionLocation.getPeer)
+          } else {
+            Seq(partitionLocation)
+          }
+        }.groupBy(_.getWorker)
+        workerToRequests
+      } else {
+        Map.empty[WorkerInfo, Set[PartitionLocation]]
+      }
+    }
+  }
+
+  protected def getUnHandledPartitionLocations(
+      shuffleId: Int,
+      shuffleCommittedInfo: ShuffleCommittedInfo): 
mutable.Set[PartitionLocation]
+
+  def incrementInFlightNum(
+      shuffleCommittedInfo: ShuffleCommittedInfo,
+      workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]]): 
Unit = {
+    
shuffleCommittedInfo.allInFlightCommitRequestNum.addAndGet(workerToRequests.size)
+  }
+
+  def decrementInFlightNum(
+      shuffleCommittedInfo: ShuffleCommittedInfo,
+      workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]]): 
Unit = {
+    
shuffleCommittedInfo.allInFlightCommitRequestNum.addAndGet(-workerToRequests.size)
+  }
+
+  /**
+   * when someone calls tryFinalCommit, the function will return true if there 
is no one ever do final commit before,
+   * otherwise it will return false.
+   *
+   * @return
+   */
+  def tryFinalCommit(
+      shuffleId: Int,
+      recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean
+
+  def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: Int): Unit 
= {
+    if (isStageDataLost(shuffleId)) {
+      context.reply(
+        GetReducerFileGroupResponse(
+          StatusCode.SHUFFLE_DATA_LOST,
+          new ConcurrentHashMap(),
+          Array.empty))
+    } else {
+      if (context.isInstanceOf[LocalNettyRpcCallContext]) {
+        // This branch is for the UTs
+        context.reply(GetReducerFileGroupResponse(
+          StatusCode.SUCCESS,
+          reducerFileGroupsMap.getOrDefault(shuffleId, new 
ConcurrentHashMap()),
+          getMapperAttempts(shuffleId)))
+      } else {
+        val cachedMsg = getReducerFileGroupRpcCache.get(
+          shuffleId,
+          new Callable[ByteBuffer]() {
+            override def call(): ByteBuffer = {
+              val returnedMsg = GetReducerFileGroupResponse(
+                StatusCode.SUCCESS,
+                reducerFileGroupsMap.getOrDefault(shuffleId, new 
ConcurrentHashMap()),
+                getMapperAttempts(shuffleId))
+              
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
+            }
+          })
+        
context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
+      }
+    }
+  }
+
+  def removeExpiredShuffle(shuffleId: Int): Unit = {
+    reducerFileGroupsMap.remove(shuffleId)
+  }
+
+  /**
+   * For reduce partition if shuffle registered and corresponding map 
finished, reply true.
+   * For map partition would always return false, as one mapper attempt 
finished don't mean mapper ended.
+   */
+  def isMapperEnded(shuffleId: Int, mapId: Int): Boolean = false
+
+  def getMapperAttempts(shuffleId: Int): Array[Int]
+
+  /**
+   * return (thisMapperAttemptedFinishedSuccessOrNot, allMapperFinishedOrNot)
+   */
+  def finishMapperAttempt(
+      shuffleId: Int,
+      mapId: Int,
+      attemptId: Int,
+      numMappers: Int,
+      partitionId: Int,
+      recordWorkerFailure: ShuffleFailedWorkers => Unit): (Boolean, Boolean)
+
+  def registerShuffle(shuffleId: Int, numMappers: Int): Unit = {
+    reducerFileGroupsMap.put(shuffleId, new ConcurrentHashMap())
+  }
+
+  def parallelCommitFiles(
+      shuffleId: Int,
+      allocatedWorkers: util.Map[WorkerInfo, PartitionLocationInfo],
+      partitionIdOpt: Option[Int] = None): CommitResult = {
+    val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
+    val masterPartMap = new ConcurrentHashMap[String, PartitionLocation]
+    val slavePartMap = new ConcurrentHashMap[String, PartitionLocation]
+    val commitFilesFailedWorkers = new ShuffleFailedWorkers()
+    val commitFileStartTime = System.nanoTime()
+    val parallelism = Math.min(allocatedWorkers.size(), conf.rpcMaxParallelism)
+    ThreadUtils.parmap(
+      allocatedWorkers.asScala.to,
+      "CommitFiles",
+      parallelism) { case (worker, partitionLocationInfo) =>
+      if (partitionLocationInfo.containsShuffle(shuffleId.toString)) {
+        val masterParts =
+          partitionLocationInfo.getMasterLocations(shuffleId.toString, 
partitionIdOpt)
+        val slaveParts = 
partitionLocationInfo.getSlaveLocations(shuffleId.toString, partitionIdOpt)
+        masterParts.asScala.foreach { p =>
+          val partition = new PartitionLocation(p)
+          partition.setFetchPort(worker.fetchPort)
+          partition.setPeer(null)
+          masterPartMap.put(partition.getUniqueId, partition)
+        }
+        slaveParts.asScala.foreach { p =>
+          val partition = new PartitionLocation(p)
+          partition.setFetchPort(worker.fetchPort)
+          partition.setPeer(null)
+          slavePartMap.put(partition.getUniqueId, partition)
+        }
+
+        val (masterIds, slaveIds) = shuffleCommittedInfo.synchronized {
+          (
+            masterParts.asScala
+              
.filterNot(shuffleCommittedInfo.handledPartitionLocations.contains)
+              .map(_.getUniqueId).asJava,
+            slaveParts.asScala
+              
.filterNot(shuffleCommittedInfo.handledPartitionLocations.contains)
+              .map(_.getUniqueId).asJava)
+        }
+
+        commitFiles(
+          appId,
+          shuffleId,
+          shuffleCommittedInfo,
+          worker,
+          masterIds,
+          slaveIds,
+          commitFilesFailedWorkers)
+      }
+    }
+
+    logInfo(s"Shuffle $shuffleId " +
+      s"commit files complete. File count 
${shuffleCommittedInfo.currentShuffleFileCount.sum()} " +
+      s"using ${(System.nanoTime() - commitFileStartTime) / 1000000} ms")
+
+    CommitResult(masterPartMap, slavePartMap, commitFilesFailedWorkers)
+  }
+
+  def commitFiles(
+      applicationId: String,
+      shuffleId: Int,
+      shuffleCommittedInfo: ShuffleCommittedInfo,
+      worker: WorkerInfo,
+      masterIds: util.List[String],
+      slaveIds: util.List[String],
+      commitFilesFailedWorkers: ShuffleFailedWorkers): Unit = {
+
+    val res =
+      if (!testRetryCommitFiles) {
+        val commitFiles = CommitFiles(
+          applicationId,
+          shuffleId,
+          masterIds,
+          slaveIds,
+          getMapperAttempts(shuffleId),
+          commitEpoch.incrementAndGet())
+        val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+
+        res.status match {
+          case StatusCode.SUCCESS => // do nothing
+          case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED 
| StatusCode.REQUEST_FAILED =>
+            logDebug(s"Request $commitFiles return ${res.status} for " +
+              s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
+            commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
+          case _ => // won't happen
+        }
+        res
+      } else {
+        // for test
+        val commitFiles1 = CommitFiles(
+          applicationId,
+          shuffleId,
+          masterIds.subList(0, masterIds.size() / 2),
+          slaveIds.subList(0, slaveIds.size() / 2),
+          getMapperAttempts(shuffleId),
+          commitEpoch.incrementAndGet())
+        val res1 = requestCommitFilesWithRetry(worker.endpoint, commitFiles1)
+
+        val commitFiles = CommitFiles(
+          applicationId,
+          shuffleId,
+          masterIds.subList(masterIds.size() / 2, masterIds.size()),
+          slaveIds.subList(slaveIds.size() / 2, slaveIds.size()),
+          getMapperAttempts(shuffleId),
+          commitEpoch.incrementAndGet())
+        val res2 = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+
+        
res1.committedMasterStorageInfos.putAll(res2.committedMasterStorageInfos)
+        res1.committedSlaveStorageInfos.putAll(res2.committedSlaveStorageInfos)
+        res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
+        CommitFilesResponse(
+          status = if (res1.status == StatusCode.SUCCESS) res2.status else 
res1.status,
+          (res1.committedMasterIds.asScala ++ 
res2.committedMasterIds.asScala).toList.asJava,
+          (res1.committedSlaveIds.asScala ++ 
res1.committedSlaveIds.asScala).toList.asJava,
+          (res1.failedMasterIds.asScala ++ 
res1.failedMasterIds.asScala).toList.asJava,
+          (res1.failedSlaveIds.asScala ++ 
res2.failedSlaveIds.asScala).toList.asJava,
+          res1.committedMasterStorageInfos,
+          res1.committedSlaveStorageInfos,
+          res1.committedMapIdBitMap,
+          res1.totalWritten + res2.totalWritten,
+          res1.fileCount + res2.fileCount)
+      }
+
+    shuffleCommittedInfo.synchronized {
+      // record committed partitionIds
+      res.committedMasterIds.asScala.foreach({
+        case commitMasterId =>
+          val partitionUniqueIdList = 
shuffleCommittedInfo.committedMasterIds.computeIfAbsent(
+            Utils.splitPartitionLocationUniqueId(commitMasterId)._1,
+            (k: Int) => new util.ArrayList[String]())
+          partitionUniqueIdList.add(commitMasterId)
+      })
+
+      res.committedSlaveIds.asScala.foreach({
+        case commitSlaveId =>
+          val partitionUniqueIdList = 
shuffleCommittedInfo.committedSlaveIds.computeIfAbsent(
+            Utils.splitPartitionLocationUniqueId(commitSlaveId)._1,
+            (k: Int) => new util.ArrayList[String]())
+          partitionUniqueIdList.add(commitSlaveId)
+      })
+
+      // record committed partitions storage hint and disk hint
+      
shuffleCommittedInfo.committedMasterStorageInfos.putAll(res.committedMasterStorageInfos)
+      
shuffleCommittedInfo.committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos)
+
+      // record failed partitions
+      shuffleCommittedInfo.failedMasterPartitionIds.putAll(
+        res.failedMasterIds.asScala.map((_, worker)).toMap.asJava)
+      shuffleCommittedInfo.failedSlavePartitionIds.putAll(
+        res.failedSlaveIds.asScala.map((_, worker)).toMap.asJava)
+
+      
shuffleCommittedInfo.committedMapIdBitmap.putAll(res.committedMapIdBitMap)
+
+      totalWritten.add(res.totalWritten)
+      fileCount.add(res.fileCount)
+      shuffleCommittedInfo.currentShuffleFileCount.add(res.fileCount)
+    }
+  }
+
+  def collectResult(
+      shuffleId: Int,
+      shuffleCommittedInfo: ShuffleCommittedInfo,
+      masterPartitionUniqueIds: util.Iterator[String],
+      slavePartitionUniqueIds: util.Iterator[String],
+      masterPartMap: ConcurrentHashMap[String, PartitionLocation],
+      slavePartMap: ConcurrentHashMap[String, PartitionLocation]): Unit = {
+    val committedPartitions = new util.HashMap[String, PartitionLocation]
+    masterPartitionUniqueIds.asScala.foreach { id =>
+      masterPartMap.get(id).setStorageInfo(
+        shuffleCommittedInfo.committedMasterStorageInfos.get(id))
+      
masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+      committedPartitions.put(id, masterPartMap.get(id))
+    }
+
+    slavePartitionUniqueIds.asScala.foreach { id =>
+      val slavePartition = slavePartMap.get(id)
+      
slavePartition.setStorageInfo(shuffleCommittedInfo.committedSlaveStorageInfos.get(id))
+      val masterPartition = committedPartitions.get(id)
+      if (masterPartition ne null) {
+        masterPartition.setPeer(slavePartition)
+        slavePartition.setPeer(masterPartition)
+      } else {
+        logInfo(s"Shuffle $shuffleId partition $id: master lost, " +
+          s"use slave $slavePartition.")
+        
slavePartition.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+        committedPartitions.put(id, slavePartition)
+      }
+    }
+
+    committedPartitions.values().asScala.foreach { partition =>
+      val partitionLocations = 
reducerFileGroupsMap.get(shuffleId).computeIfAbsent(
+        partition.getId,
+        (k: Integer) => new util.HashSet[PartitionLocation]())
+      partitionLocations.add(partition)
+    }
+  }
+
+  private def requestCommitFilesWithRetry(
+      endpoint: RpcEndpointRef,
+      message: CommitFiles): CommitFilesResponse = {
+    val maxRetries = conf.requestCommitFilesMaxRetries
+    var retryTimes = 0
+    while (retryTimes < maxRetries) {
+      try {
+        if (testRetryCommitFiles && retryTimes < maxRetries - 1) {
+          endpoint.ask[CommitFilesResponse](message)
+          Thread.sleep(1000)
+          throw new Exception("Mock fail for CommitFiles")
+        } else {
+          return endpoint.askSync[CommitFilesResponse](message)
+        }
+      } catch {
+        case e: Throwable =>
+          retryTimes += 1
+          logError(
+            s"AskSync CommitFiles for ${message.shuffleId} failed (attempt 
$retryTimes/$maxRetries).",
+            e)
+      }
+    }
+
+    CommitFilesResponse(
+      StatusCode.REQUEST_FAILED,
+      List.empty.asJava,
+      List.empty.asJava,
+      message.masterIds,
+      message.slaveIds)
+  }
+
+  def checkDataLost(
+      shuffleId: Int,
+      masterPartitionUniqueIdMap: util.Map[String, WorkerInfo],
+      slavePartitionUniqueIdMap: util.Map[String, WorkerInfo]): Boolean = {
+    val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
+    if (!pushReplicateEnabled && masterPartitionUniqueIdMap.size() != 0) {
+      val msg =
+        masterPartitionUniqueIdMap.asScala.map {
+          case (partitionUniqueId, workerInfo) =>
+            s"Lost partition $partitionUniqueId in worker 
[${workerInfo.readableAddress()}]"
+        }.mkString("\n")
+      logError(
+        s"""
+           |For shuffle $shuffleKey partition data lost:
+           |$msg
+           |""".stripMargin)
+      true
+    } else {
+      val failedBothPartitionIdsToWorker = 
masterPartitionUniqueIdMap.asScala.flatMap {
+        case (partitionUniqueId, worker) =>
+          if (slavePartitionUniqueIdMap.asScala.contains(partitionUniqueId)) {
+            Some(partitionUniqueId -> (worker, 
slavePartitionUniqueIdMap.get(partitionUniqueId)))
+          } else {
+            None
+          }
+      }
+      if (failedBothPartitionIdsToWorker.nonEmpty) {
+        val msg = failedBothPartitionIdsToWorker.map {
+          case (partitionUniqueId, (masterWorker, slaveWorker)) =>
+            s"Lost partition $partitionUniqueId " +
+              s"in master worker [${masterWorker.readableAddress()}] and slave 
worker [$slaveWorker]"
+        }.mkString("\n")
+        logError(
+          s"""
+             |For shuffle $shuffleKey partition data lost:
+             |$msg
+             |""".stripMargin)
+        true
+      } else {
+        false
+      }
+    }
+  }
+
+  def commitMetrics(): (Long, Long) = (totalWritten.sumThenReset(), 
fileCount.sumThenReset())
+}
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
 
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 0fb46765..75068a87 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -21,7 +21,7 @@ public enum StatusCode {
   // 1/0 Status
   SUCCESS(0),
   PARTIAL_SUCCESS(1),
-  FAILED(2),
+  REQUEST_FAILED(2),
 
   // Specific Status
   SHUFFLE_ALREADY_REGISTERED(3),
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 09b68947..ac6d5f05 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -837,7 +837,7 @@ object Utils extends Logging {
       case 1 =>
         StatusCode.PARTIAL_SUCCESS
       case 2 =>
-        StatusCode.FAILED
+        StatusCode.REQUEST_FAILED
       case 3 =>
         StatusCode.SHUFFLE_ALREADY_REGISTERED
       case 4 =>
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a930c98f..ba30c934 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -717,7 +717,7 @@ private[celeborn] class Master(
           new util.HashMap[String, DiskInfo](),
           new ConcurrentHashMap[UserIdentifier, ResourceConsumption](),
           null))
-        GetWorkerInfosResponse(StatusCode.FAILED, result.asScala: _*)
+        GetWorkerInfosResponse(StatusCode.REQUEST_FAILED, result.asScala: _*)
     }
   }
 

Reply via email to