gauravkm commented on code in PR #3261:
URL: https://github.com/apache/celeborn/pull/3261#discussion_r2163179846


##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -301,19 +334,71 @@ class ReducePartitionCommitHandler(
   override def registerShuffle(
       shuffleId: Int,
       numMappers: Int,
-      isSegmentGranularityVisible: Boolean): Unit = {
-    super.registerShuffle(shuffleId, numMappers, isSegmentGranularityVisible)
+      isSegmentGranularityVisible: Boolean,
+      numPartitions: Int): Unit = {
+    super.registerShuffle(shuffleId, numMappers, isSegmentGranularityVisible, 
numPartitions)
     getReducerFileGroupRequest.put(shuffleId, new 
util.HashSet[MultiSerdeVersionRpcContext]())
-    initMapperAttempts(shuffleId, numMappers)
+    initMapperAttempts(shuffleId, numMappers, numPartitions)
+  }
+
+  override def finishPartition(
+      shuffleId: Int,
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata): (Boolean, String) = {
+    logDebug(s"finish Partition call: shuffleId: $shuffleId, " +
+      s"partitionId: $partitionId, " +
+      s"startMapIndex: $startMapIndex " +
+      s"endMapIndex: $endMapIndex, " +
+      s"actualCommitMetadata: $actualCommitMetadata")
+    val map = commitMetadataForReducer.get(shuffleId);
+    checkState(
+      map != null,
+      "CommitMetadata map cannot be null for a registered shuffleId: %d",
+      shuffleId)
+    val expectedCommitMetadata = map(partitionId)
+    if (endMapIndex == Integer.MAX_VALUE) {
+      // complete partition available
+      val bool = CommitMetadata.checkCommitMetadata(actualCommitMetadata, 
expectedCommitMetadata)
+      var message = ""
+      if (!bool) {
+        message =
+          s"CommitMetadata mismatch for shuffleId: $shuffleId partitionId: 
$partitionId expected: $expectedCommitMetadata actual: $actualCommitMetadata"
+      } else {
+        logInfo(
+          s"CommitMetadata matched for shuffleID : $shuffleId, partitionId: 
$partitionId expected: $expectedCommitMetadata actual: $actualCommitMetadata")
+      }
+      return (bool, message)
+    }
+
+    val splitSkewPartitionWithoutMapRange =
+      ClientUtils.readSkewPartitionWithoutMapRange(conf, startMapIndex, 
endMapIndex)
+
+    val validator = aqePartitionCompletenessValidator.computeIfAbsent(
+      shuffleId,
+      new java.util.function.Function[Int, PartitionCompletenessValidator] {
+        override def apply(key: Int): PartitionCompletenessValidator =
+          new PartitionCompletenessValidator()
+      })
+    validator.validateSubPartition(
+      partitionId,
+      startMapIndex,
+      endMapIndex,
+      actualCommitMetadata,
+      expectedCommitMetadata,
+      shuffleMapperAttempts.get(shuffleId).length,
+      splitSkewPartitionWithoutMapRange)
   }
 
-  private def initMapperAttempts(shuffleId: Int, numMappers: Int): Unit = {
+  private def initMapperAttempts(shuffleId: Int, numMappers: Int, 
numPartitions: Int): Unit = {
     shuffleMapperAttempts.synchronized {
       if (!shuffleMapperAttempts.containsKey(shuffleId)) {
         val attempts = new Array[Int](numMappers)
         0 until numMappers foreach (idx => attempts(idx) = -1)
         shuffleMapperAttempts.put(shuffleId, attempts)
       }
+      commitMetadataForReducer.put(shuffleId, new 
Array[CommitMetadata](numPartitions))

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to