This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 39394526 [CELEBORN-142]Keep committed partition locations semantic
consistent when commit files on HDFS. (#1091)
39394526 is described below
commit 39394526a8b27bdc1f75ed3851702c6be3a2bd29
Author: Ethan Feng <[email protected]>
AuthorDate: Fri Dec 16 19:02:02 2022 +0800
[CELEBORN-142]Keep committed partition locations semantic consistent when
commit files on HDFS. (#1091)
---
.../celeborn/client/commit/CommitHandler.scala | 34 +++++++++-------------
.../service/deploy/worker/Controller.scala | 13 +++++----
2 files changed, 21 insertions(+), 26 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 7a4aad30..dd38c965 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -254,32 +254,24 @@ abstract class CommitHandler(
slavePartMap: ConcurrentHashMap[String, PartitionLocation]): Unit = {
val committedPartitions = new util.HashMap[String, PartitionLocation]
masterPartitionUniqueIds.asScala.foreach { id =>
- if (shuffleCommittedInfo.committedMasterStorageInfos.get(id) == null) {
- logDebug(s"$appId-$shuffleId $id storage hint was not returned")
- } else {
- masterPartMap.get(id).setStorageInfo(
- shuffleCommittedInfo.committedMasterStorageInfos.get(id))
-
masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
- committedPartitions.put(id, masterPartMap.get(id))
- }
+ masterPartMap.get(id).setStorageInfo(
+ shuffleCommittedInfo.committedMasterStorageInfos.get(id))
+
masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+ committedPartitions.put(id, masterPartMap.get(id))
}
slavePartitionUniqueIds.asScala.foreach { id =>
val slavePartition = slavePartMap.get(id)
- if (shuffleCommittedInfo.committedSlaveStorageInfos.get(id) == null) {
- logDebug(s"$appId-$shuffleId $id storage hint was not returned")
+
slavePartition.setStorageInfo(shuffleCommittedInfo.committedSlaveStorageInfos.get(id))
+ val masterPartition = committedPartitions.get(id)
+ if (masterPartition ne null) {
+ masterPartition.setPeer(slavePartition)
+ slavePartition.setPeer(masterPartition)
} else {
-
slavePartition.setStorageInfo(shuffleCommittedInfo.committedSlaveStorageInfos.get(id))
- val masterPartition = committedPartitions.get(id)
- if (masterPartition ne null) {
- masterPartition.setPeer(slavePartition)
- slavePartition.setPeer(masterPartition)
- } else {
- logInfo(s"Shuffle $shuffleId partition $id: master lost, " +
- s"use slave $slavePartition.")
-
slavePartition.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
- committedPartitions.put(id, slavePartition)
- }
+ logInfo(s"Shuffle $shuffleId partition $id: master lost, " +
+ s"use slave $slavePartition.")
+
slavePartition.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+ committedPartitions.put(id, slavePartition)
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 8e2faffe..f4f311cc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -279,16 +279,19 @@ private[deploy] class Controller(
val fileWriter =
location.asInstanceOf[WorkingPartition].getFileWriter
val bytes = fileWriter.close()
if (bytes > 0L) {
- if (fileWriter.getStorageInfo != null) {
+ if (fileWriter.getStorageInfo == null) {
+ // This branch means that this partition location is
deleted.
+ logDebug(s"Location $uniqueId is deleted.")
+ } else {
committedStorageInfos.put(uniqueId,
fileWriter.getStorageInfo)
if (fileWriter.getMapIdBitMap != null) {
committedMapIdBitMap.put(uniqueId,
fileWriter.getMapIdBitMap)
}
+ if (bytes >= minPartitionSizeToEstimate) {
+ partitionSizeList.add(bytes)
+ }
+ committedIds.add(uniqueId)
}
- if (bytes >= minPartitionSizeToEstimate) {
- partitionSizeList.add(bytes)
- }
- committedIds.add(uniqueId)
}
} catch {
case e: IOException =>