waitinfuture commented on code in PR #990:
URL:
https://github.com/apache/incubator-celeborn/pull/990#discussion_r1033621234
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -249,6 +285,98 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
batchHandleChangePartitionRequestInterval,
TimeUnit.MILLISECONDS)
}
+
+ batchHandleCommitPartitionSchedulerThread.foreach {
+ _.scheduleAtFixedRate(
+ new Runnable {
+ override def run(): Unit = {
+ committedPartitionInfo.asScala.foreach { case (shuffleId,
shuffleCommittedInfo) =>
+ batchHandleCommitPartitionExecutors.submit {
+ new Runnable {
+ override def run(): Unit = {
+ if (inProcessStageEndShuffleSet.contains(shuffleId) ||
+ stageEndShuffleSet.contains(shuffleId)) {
+ logWarning(s"Shuffle $shuffleId ended or during
processing stage end.")
+ shuffleCommittedInfo.commitPartitionRequests.clear()
Review Comment:
Should put this line inside shuffleCommittedInfo.synchronize
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1178,6 +1265,94 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
ReleaseSlots(applicationId, shuffleId, List.empty.asJava,
List.empty.asJava))
}
+ private def commitFiles(
+ applicationId: String,
+ shuffleId: Int,
+ shuffleCommittedInfo: ShuffleCommittedInfo,
+ worker: WorkerInfo,
+ masterIds: util.List[String],
+ slaveIds: util.List[String],
+ commitFilesFailedWorkers: ConcurrentHashMap[WorkerInfo, (StatusCode,
Long)]): Unit = {
+
+ val res =
+ if (!testRetryCommitFiles) {
+ val commitFiles = CommitFiles(
+ applicationId,
+ shuffleId,
+ masterIds,
+ slaveIds,
+ shuffleMapperAttempts.get(shuffleId),
+ commitEpoch.incrementAndGet())
+ shuffleCommittedInfo.inFlightCommitRequest.incrementAndGet()
+ val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+ shuffleCommittedInfo.inFlightCommitRequest.decrementAndGet()
+
+ res.status match {
+ case StatusCode.SUCCESS => // do nothing
+ case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED
| StatusCode.FAILED =>
+ logDebug(s"Request $commitFiles return ${res.status} for " +
+ s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
+ commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
+ case _ => // won't happen
+ }
+ res
+ } else {
+ // for test
+ val commitFiles1 = CommitFiles(
+ applicationId,
+ shuffleId,
+ masterIds.subList(0, masterIds.size() / 2),
+ slaveIds.subList(0, slaveIds.size() / 2),
+ shuffleMapperAttempts.get(shuffleId),
+ commitEpoch.incrementAndGet())
+ val res1 = requestCommitFilesWithRetry(worker.endpoint, commitFiles1)
+
+ val commitFiles = CommitFiles(
+ applicationId,
+ shuffleId,
+ masterIds.subList(masterIds.size() / 2, masterIds.size()),
+ slaveIds.subList(slaveIds.size() / 2, slaveIds.size()),
+ shuffleMapperAttempts.get(shuffleId),
+ commitEpoch.incrementAndGet())
+ val res2 = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
+
+
res1.committedMasterStorageInfos.putAll(res2.committedMasterStorageInfos)
+ res1.committedSlaveStorageInfos.putAll(res2.committedSlaveStorageInfos)
+ res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
+ CommitFilesResponse(
+ status = if (res1.status == StatusCode.SUCCESS) res2.status else
res1.status,
+ (res1.committedMasterIds.asScala ++
res2.committedMasterIds.asScala).toList.asJava,
+ (res1.committedSlaveIds.asScala ++
res1.committedSlaveIds.asScala).toList.asJava,
+ (res1.failedMasterIds.asScala ++
res1.failedMasterIds.asScala).toList.asJava,
+ (res1.failedSlaveIds.asScala ++
res2.failedSlaveIds.asScala).toList.asJava,
+ res1.committedMasterStorageInfos,
+ res1.committedSlaveStorageInfos,
+ res1.committedMapIdBitMap,
+ res1.totalWritten + res2.totalWritten,
+ res1.fileCount + res2.fileCount)
+ }
+
+ // record committed partitionIds
+ shuffleCommittedInfo.committedMasterIds.addAll(res.committedMasterIds)
Review Comment:
ditto
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -249,6 +285,98 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
batchHandleChangePartitionRequestInterval,
TimeUnit.MILLISECONDS)
}
+
+ batchHandleCommitPartitionSchedulerThread.foreach {
+ _.scheduleAtFixedRate(
+ new Runnable {
+ override def run(): Unit = {
+ committedPartitionInfo.asScala.foreach { case (shuffleId,
shuffleCommittedInfo) =>
+ batchHandleCommitPartitionExecutors.submit {
+ new Runnable {
+ override def run(): Unit = {
+ if (inProcessStageEndShuffleSet.contains(shuffleId) ||
+ stageEndShuffleSet.contains(shuffleId)) {
+ logWarning(s"Shuffle $shuffleId ended or during
processing stage end.")
+ shuffleCommittedInfo.commitPartitionRequests.clear()
+ } else {
+ val currentBatch = shuffleCommittedInfo.synchronized {
+ val batch =
ConcurrentHashMap.newKeySet[CommitPartitionRequest]()
+
batch.addAll(shuffleCommittedInfo.commitPartitionRequests)
+ val currentBatch = batch.asScala.filterNot { request =>
+
shuffleCommittedInfo.handledCommitPartitionRequests.contains(
+ request.partition)
+ }
+
shuffleCommittedInfo.commitPartitionRequests.removeAll(batch)
Review Comment:
shuffleCommittedInfo.commitPartitionRequests.clear()
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -692,6 +834,13 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
// check if there exists request for the partition, if do just register
val requests = changePartitionRequests.computeIfAbsent(shuffleId,
rpcContextRegisterFunc)
inBatchPartitions.computeIfAbsent(shuffleId, inBatchShuffleIdRegisterFunc)
+
+ // handle hard split
+ if (batchHandleCommitPartitionEnabled && cause.isDefined && cause.get ==
StatusCode.HARD_SPLIT) {
+ committedPartitionInfo.get(shuffleId).commitPartitionRequests
Review Comment:
I think we should put all code referring ShuffleCommittedInfo inside
ShuffleCommittedInfo.synchronize{}
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -986,108 +1127,49 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
slavePartMap.put(partition.getUniqueId, partition)
}
- val masterIds = masterParts.asScala.map(_.getUniqueId).asJava
- val slaveIds = slaveParts.asScala.map(_.getUniqueId).asJava
-
- val res =
- if (!testRetryCommitFiles) {
- val commitFiles = CommitFiles(
- applicationId,
- shuffleId,
- masterIds,
- slaveIds,
- shuffleMapperAttempts.get(shuffleId),
- commitEpoch.incrementAndGet())
- val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
-
- res.status match {
- case StatusCode.SUCCESS => // do nothing
- case StatusCode.PARTIAL_SUCCESS |
StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.FAILED =>
- logDebug(s"Request $commitFiles return ${res.status} for " +
- s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
- commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
- case _ => // won't happen
- }
- res
- } else {
- // for test
- val commitFiles1 = CommitFiles(
- applicationId,
- shuffleId,
- masterIds.subList(0, masterIds.size() / 2),
- slaveIds.subList(0, slaveIds.size() / 2),
- shuffleMapperAttempts.get(shuffleId),
- commitEpoch.incrementAndGet())
- val res1 = requestCommitFilesWithRetry(worker.endpoint,
commitFiles1)
-
- val commitFiles = CommitFiles(
- applicationId,
- shuffleId,
- masterIds.subList(masterIds.size() / 2, masterIds.size()),
- slaveIds.subList(slaveIds.size() / 2, slaveIds.size()),
- shuffleMapperAttempts.get(shuffleId),
- commitEpoch.incrementAndGet())
- val res2 = requestCommitFilesWithRetry(worker.endpoint,
commitFiles)
-
-
res1.committedMasterStorageInfos.putAll(res2.committedMasterStorageInfos)
-
res1.committedSlaveStorageInfos.putAll(res2.committedSlaveStorageInfos)
- res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
- CommitFilesResponse(
- status = if (res1.status == StatusCode.SUCCESS) res2.status else
res1.status,
- (res1.committedMasterIds.asScala ++
res2.committedMasterIds.asScala).toList.asJava,
- (res1.committedSlaveIds.asScala ++
res1.committedSlaveIds.asScala).toList.asJava,
- (res1.failedMasterIds.asScala ++
res1.failedMasterIds.asScala).toList.asJava,
- (res1.failedSlaveIds.asScala ++
res2.failedSlaveIds.asScala).toList.asJava,
- res1.committedMasterStorageInfos,
- res1.committedSlaveStorageInfos,
- res1.committedMapIdBitMap,
- res1.totalWritten + res2.totalWritten,
- res1.fileCount + res2.fileCount)
- }
-
- // record committed partitionIds
- committedMasterIds.addAll(res.committedMasterIds)
- committedSlaveIds.addAll(res.committedSlaveIds)
-
- // record committed partitions storage hint and disk hint
- committedMasterStorageInfos.putAll(res.committedMasterStorageInfos)
- committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos)
-
- // record failed partitions
- failedMasterPartitionIds.putAll(res.failedMasterIds.asScala.map((_,
worker)).toMap.asJava)
- failedSlavePartitionIds.putAll(res.failedSlaveIds.asScala.map((_,
worker)).toMap.asJava)
-
- if (!res.committedMapIdBitMap.isEmpty) {
- committedMapIdBitmap.putAll(res.committedMapIdBitMap)
- }
+ val masterIds = masterParts.asScala
Review Comment:
ditto
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -249,6 +285,98 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
batchHandleChangePartitionRequestInterval,
TimeUnit.MILLISECONDS)
}
+
+ batchHandleCommitPartitionSchedulerThread.foreach {
+ _.scheduleAtFixedRate(
+ new Runnable {
+ override def run(): Unit = {
+ committedPartitionInfo.asScala.foreach { case (shuffleId,
shuffleCommittedInfo) =>
+ batchHandleCommitPartitionExecutors.submit {
+ new Runnable {
+ override def run(): Unit = {
+ if (inProcessStageEndShuffleSet.contains(shuffleId) ||
+ stageEndShuffleSet.contains(shuffleId)) {
+ logWarning(s"Shuffle $shuffleId ended or during
processing stage end.")
+ shuffleCommittedInfo.commitPartitionRequests.clear()
+ } else {
+ val currentBatch = shuffleCommittedInfo.synchronized {
+ val batch =
ConcurrentHashMap.newKeySet[CommitPartitionRequest]()
Review Comment:
Just use HashSet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]