RexXiong commented on code in PR #2924:
URL: https://github.com/apache/celeborn/pull/2924#discussion_r1867678647


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -747,6 +829,77 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
+  class PushMergedDataCallback(callback: RpcResponseCallback) {
+    private val splitPartitionStatuses = new mutable.HashMap[Int, Byte]()
+
+    def addSplitPartition(index: Int, statusCode: StatusCode): Unit = {
+      splitPartitionStatuses.put(index, statusCode.getValue)
+    }
+
+    def isHardSplitPartition(index: Int): Boolean = {
+      splitPartitionStatuses.getOrElse(index, -1) == 
StatusCode.HARD_SPLIT.getValue
+    }
+
+    def unionReplicaSplitPartitions(
+        replicaPartitionIndexes: util.List[Integer],
+        replicaStatusCodes: util.List[Integer]): Unit = {
+      if (replicaPartitionIndexes.size() != replicaStatusCodes.size()) {
+        throw new IllegalArgumentException(
+          "replicaPartitionIndexes and replicaStatusCodes must have the same 
size")
+      }
+      for (i <- 0 until replicaPartitionIndexes.size()) {
+        val index = replicaPartitionIndexes.get(i)
+        // The priority of HARD_SPLIT is higher than that of SOFT_SPLIT.
+        if (!isHardSplitPartition(index)) {
+          splitPartitionStatuses.put(index, 
replicaStatusCodes.get(i).byteValue())
+        }
+      }
+    }
+
+    /**
+     * Returns the ordered indexes of partitions that are not writable.
+     * A partition is considered not writable if it is marked as HARD_SPLIT or 
failed.
+     */
+    def getHardSplitIndexes: Array[Int] = {
+      splitPartitionStatuses.collect {
+        case (partitionIndex, statusCode) if statusCode == 
StatusCode.HARD_SPLIT.getValue =>
+          partitionIndex
+      }.toSeq.sorted.toArray
+    }
+
+    def onSuccess(status: StatusCode): Unit = {
+      val splitPartitionIndexes = new util.ArrayList[Integer]()
+      val statusCodes = new util.ArrayList[Integer]()
+      splitPartitionStatuses.foreach {
+        case (partitionIndex, statusCode) =>
+          splitPartitionIndexes.add(partitionIndex)
+          statusCodes.add(statusCode)
+      }
+      val reason: Byte =
+        if (splitPartitionStatuses.isEmpty || status == StatusCode.MAP_ENDED) {
+          status.getValue
+        } else {
+          StatusCode.HARD_SPLIT.getValue
+        }
+      val pushMergedDataInfo = PbPushMergedDataSplitPartitionInfo.newBuilder()

Review Comment:
   Only Hard_Split need Send pushMergedDataInfo?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -562,38 +598,63 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
             
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
             logError(
               s"PushMergedData replication failed caused by unavailable peer 
for partitionLocation: $location")
-            callbackWithTimer.onFailure(
+            pushMergedDataCallback.onFailure(
               new 
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
             return
           }
 
           // Handle the response from replica
           val wrappedCallback = new RpcResponseCallback() {
             override def onSuccess(response: ByteBuffer): Unit = {
+              val replicaReason = response.get()
+              try {

Review Comment:
   Should we check hardSplit before we unionReplicaPartition Status?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to