RexXiong commented on code in PR #2924: URL: https://github.com/apache/celeborn/pull/2924#discussion_r1867678647
########## worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala: ########## @@ -747,6 +829,77 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler } } + class PushMergedDataCallback(callback: RpcResponseCallback) { + private val splitPartitionStatuses = new mutable.HashMap[Int, Byte]() + + def addSplitPartition(index: Int, statusCode: StatusCode): Unit = { + splitPartitionStatuses.put(index, statusCode.getValue) + } + + def isHardSplitPartition(index: Int): Boolean = { + splitPartitionStatuses.getOrElse(index, -1) == StatusCode.HARD_SPLIT.getValue + } + + def unionReplicaSplitPartitions( + replicaPartitionIndexes: util.List[Integer], + replicaStatusCodes: util.List[Integer]): Unit = { + if (replicaPartitionIndexes.size() != replicaStatusCodes.size()) { + throw new IllegalArgumentException( + "replicaPartitionIndexes and replicaStatusCodes must have the same size") + } + for (i <- 0 until replicaPartitionIndexes.size()) { + val index = replicaPartitionIndexes.get(i) + // The priority of HARD_SPLIT is higher than that of SOFT_SPLIT. + if (!isHardSplitPartition(index)) { + splitPartitionStatuses.put(index, replicaStatusCodes.get(i).byteValue()) + } + } + } + + /** + * Returns the ordered indexes of partitions that are not writable. + * A partition is considered not writable if it is marked as HARD_SPLIT or failed. + */ + def getHardSplitIndexes: Array[Int] = { + splitPartitionStatuses.collect { + case (partitionIndex, statusCode) if statusCode == StatusCode.HARD_SPLIT.getValue => + partitionIndex + }.toSeq.sorted.toArray + } + + def onSuccess(status: StatusCode): Unit = { + val splitPartitionIndexes = new util.ArrayList[Integer]() + val statusCodes = new util.ArrayList[Integer]() + splitPartitionStatuses.foreach { + case (partitionIndex, statusCode) => + splitPartitionIndexes.add(partitionIndex) + statusCodes.add(statusCode) + } + val reason: Byte = + if (splitPartitionStatuses.isEmpty || status == StatusCode.MAP_ENDED) { + status.getValue + } else { + StatusCode.HARD_SPLIT.getValue + } + val pushMergedDataInfo = PbPushMergedDataSplitPartitionInfo.newBuilder() Review Comment: Only Hard_Split need Send pushMergedDataInfo? ########## worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala: ########## @@ -562,38 +598,63 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT) logError( s"PushMergedData replication failed caused by unavailable peer for partitionLocation: $location") - callbackWithTimer.onFailure( + pushMergedDataCallback.onFailure( new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA)) return } // Handle the response from replica val wrappedCallback = new RpcResponseCallback() { override def onSuccess(response: ByteBuffer): Unit = { + val replicaReason = response.get() + try { Review Comment: Should we check hardSplit before we unionReplicaPartition Status? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@celeborn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org