This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 39394526 [CELEBORN-142]Keep committed partition locations semantic 
consistent when commit files on HDFS. (#1091)
39394526 is described below

commit 39394526a8b27bdc1f75ed3851702c6be3a2bd29
Author: Ethan Feng <[email protected]>
AuthorDate: Fri Dec 16 19:02:02 2022 +0800

    [CELEBORN-142]Keep committed partition locations semantic consistent when 
commit files on HDFS. (#1091)
---
 .../celeborn/client/commit/CommitHandler.scala     | 34 +++++++++-------------
 .../service/deploy/worker/Controller.scala         | 13 +++++----
 2 files changed, 21 insertions(+), 26 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 7a4aad30..dd38c965 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -254,32 +254,24 @@ abstract class CommitHandler(
       slavePartMap: ConcurrentHashMap[String, PartitionLocation]): Unit = {
     val committedPartitions = new util.HashMap[String, PartitionLocation]
     masterPartitionUniqueIds.asScala.foreach { id =>
-      if (shuffleCommittedInfo.committedMasterStorageInfos.get(id) == null) {
-        logDebug(s"$appId-$shuffleId $id storage hint was not returned")
-      } else {
-        masterPartMap.get(id).setStorageInfo(
-          shuffleCommittedInfo.committedMasterStorageInfos.get(id))
-        
masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
-        committedPartitions.put(id, masterPartMap.get(id))
-      }
+      masterPartMap.get(id).setStorageInfo(
+        shuffleCommittedInfo.committedMasterStorageInfos.get(id))
+      
masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+      committedPartitions.put(id, masterPartMap.get(id))
     }
 
     slavePartitionUniqueIds.asScala.foreach { id =>
       val slavePartition = slavePartMap.get(id)
-      if (shuffleCommittedInfo.committedSlaveStorageInfos.get(id) == null) {
-        logDebug(s"$appId-$shuffleId $id storage hint was not returned")
+      
slavePartition.setStorageInfo(shuffleCommittedInfo.committedSlaveStorageInfos.get(id))
+      val masterPartition = committedPartitions.get(id)
+      if (masterPartition ne null) {
+        masterPartition.setPeer(slavePartition)
+        slavePartition.setPeer(masterPartition)
       } else {
-        
slavePartition.setStorageInfo(shuffleCommittedInfo.committedSlaveStorageInfos.get(id))
-        val masterPartition = committedPartitions.get(id)
-        if (masterPartition ne null) {
-          masterPartition.setPeer(slavePartition)
-          slavePartition.setPeer(masterPartition)
-        } else {
-          logInfo(s"Shuffle $shuffleId partition $id: master lost, " +
-            s"use slave $slavePartition.")
-          
slavePartition.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
-          committedPartitions.put(id, slavePartition)
-        }
+        logInfo(s"Shuffle $shuffleId partition $id: master lost, " +
+          s"use slave $slavePartition.")
+        
slavePartition.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
+        committedPartitions.put(id, slavePartition)
       }
     }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 8e2faffe..f4f311cc 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -279,16 +279,19 @@ private[deploy] class Controller(
                 val fileWriter = 
location.asInstanceOf[WorkingPartition].getFileWriter
                 val bytes = fileWriter.close()
                 if (bytes > 0L) {
-                  if (fileWriter.getStorageInfo != null) {
+                  if (fileWriter.getStorageInfo == null) {
+                    // This branch means that this partition location is 
deleted.
+                    logDebug(s"Location $uniqueId is deleted.")
+                  } else {
                     committedStorageInfos.put(uniqueId, 
fileWriter.getStorageInfo)
                     if (fileWriter.getMapIdBitMap != null) {
                       committedMapIdBitMap.put(uniqueId, 
fileWriter.getMapIdBitMap)
                     }
+                    if (bytes >= minPartitionSizeToEstimate) {
+                      partitionSizeList.add(bytes)
+                    }
+                    committedIds.add(uniqueId)
                   }
-                  if (bytes >= minPartitionSizeToEstimate) {
-                    partitionSizeList.add(bytes)
-                  }
-                  committedIds.add(uniqueId)
                 }
               } catch {
                 case e: IOException =>

Reply via email to