gaoyajun02 commented on code in PR #3261:
URL: https://github.com/apache/celeborn/pull/3261#discussion_r2161652366


##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -422,6 +426,16 @@ abstract class CommitHandler(
     }
   }
 
+  /*
+  Invoked when a reduce partition finishes reading data to perform end to end 
integrity check validation

Review Comment:
   ```suggestion
     /**
      * Invoked when a reduce partition finishes reading data to perform end to 
end integrity check validation
      */
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/LegacySkewHandlingPartitionValidator.scala:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.commit
+
+import java.util
+import java.util.Comparator
+import java.util.function.BiFunction
+
+import com.google.common.base.Preconditions.{checkArgument, checkState}
+
+import org.apache.celeborn.common.CommitMetadata
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.JavaUtils
+
+class LegacySkewHandlingPartitionValidator extends 
AbstractPartitionCompletenessValidator
+  with Logging {
+  private val subRangeToCommitMetadataPerReducer = {
+    JavaUtils.newConcurrentHashMap[Int, java.util.TreeMap[(Int, Int), 
CommitMetadata]]()
+  }
+  private val partitionToSubPartitionCount = 
JavaUtils.newConcurrentHashMap[Int, Int]()
+  private val currentCommitMetadataForReducer =
+    JavaUtils.newConcurrentHashMap[Int, CommitMetadata]()
+  private val currentTotalMapIdCountForReducer = 
JavaUtils.newConcurrentHashMap[Int, Int]()
+  private val comparator: java.util.Comparator[(Int, Int)] = new 
Comparator[(Int, Int)] {
+    override def compare(o1: (Int, Int), o2: (Int, Int)): Int = {
+      val comparator = Integer.compare(o1._1, o2._1)
+      if (comparator != 0)
+        comparator
+      else
+        Integer.compare(o1._2, o2._2)
+    }
+  }
+
+  private def checkOverlappingRange(
+      treeMap: java.util.TreeMap[(Int, Int), CommitMetadata],
+      startMapIndex: Int,
+      endMapIndex: Int): (Boolean, ((Int, Int), CommitMetadata)) = {
+    val floorEntry: util.Map.Entry[(Int, Int), CommitMetadata] =
+      treeMap.floorEntry((startMapIndex, endMapIndex))
+    val ceilingEntry: util.Map.Entry[(Int, Int), CommitMetadata] =
+      treeMap.ceilingEntry((startMapIndex, endMapIndex))
+
+    if (floorEntry != null) {
+      if (startMapIndex < floorEntry.getKey._2) {
+        return (true, ((floorEntry.getKey._1, floorEntry.getKey._2), 
floorEntry.getValue))
+      }
+    }
+    if (ceilingEntry != null) {
+      if (endMapIndex > ceilingEntry.getKey._1) {
+        return (true, ((ceilingEntry.getKey._1, ceilingEntry.getKey._2), 
ceilingEntry.getValue))
+      }
+    }
+    (false, ((0, 0), new CommitMetadata()))
+  }
+
+  override def processSubPartition(
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata,
+      expectedTotalMapperCount: Int): (Boolean, String) = {
+    checkArgument(
+      startMapIndex < endMapIndex,
+      "startMapIndex %s must be less than endMapIndex %s",
+      startMapIndex,
+      endMapIndex)
+    logDebug(
+      s"Validate partition invoked for partitionId $partitionId startMapIndex 
$startMapIndex endMapIndex $endMapIndex")
+    partitionToSubPartitionCount.put(partitionId, expectedTotalMapperCount)
+    val subRangeToCommitMetadataMap = 
subRangeToCommitMetadataPerReducer.computeIfAbsent(
+      partitionId,
+      new java.util.function.Function[Int, util.TreeMap[(Int, Int), 
CommitMetadata]] {
+        override def apply(key: Int): util.TreeMap[(Int, Int), CommitMetadata] 
=
+          new util.TreeMap[(Int, Int), CommitMetadata](comparator)
+      })
+    subRangeToCommitMetadataMap.synchronized {
+      val existingMetadata = subRangeToCommitMetadataMap.get((startMapIndex, 
endMapIndex))
+      if (existingMetadata == null) {
+        val (isOverlapping, overlappingEntry) =
+          checkOverlappingRange(subRangeToCommitMetadataMap, startMapIndex, 
endMapIndex)
+        if (isOverlapping) {
+          val errorMessage = s"Encountered overlapping map range for 
partitionId: $partitionId " +
+            s" while processing range with startMapIndex: $startMapIndex and 
endMapIndex: $endMapIndex " +
+            s"existing range map: $subRangeToCommitMetadataMap " +
+            s"overlapped with Entry((startMapIndex, endMapIndex), count): 
$overlappingEntry"
+          logError(errorMessage)
+          return (false, errorMessage)
+        } else {
+          subRangeToCommitMetadataMap.put((startMapIndex, endMapIndex), 
actualCommitMetadata)
+          currentCommitMetadataForReducer.merge(
+            partitionId,
+            actualCommitMetadata,
+            new java.util.function.BiFunction[CommitMetadata, CommitMetadata, 
CommitMetadata] {

Review Comment:
   wen can extract BiFunction as class members to avoid repeated creation



##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -301,19 +334,71 @@ class ReducePartitionCommitHandler(
   override def registerShuffle(
       shuffleId: Int,
       numMappers: Int,
-      isSegmentGranularityVisible: Boolean): Unit = {
-    super.registerShuffle(shuffleId, numMappers, isSegmentGranularityVisible)
+      isSegmentGranularityVisible: Boolean,
+      numPartitions: Int): Unit = {
+    super.registerShuffle(shuffleId, numMappers, isSegmentGranularityVisible, 
numPartitions)
     getReducerFileGroupRequest.put(shuffleId, new 
util.HashSet[MultiSerdeVersionRpcContext]())
-    initMapperAttempts(shuffleId, numMappers)
+    initMapperAttempts(shuffleId, numMappers, numPartitions)
+  }
+
+  override def finishPartition(
+      shuffleId: Int,
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata): (Boolean, String) = {
+    logDebug(s"finish Partition call: shuffleId: $shuffleId, " +
+      s"partitionId: $partitionId, " +
+      s"startMapIndex: $startMapIndex " +
+      s"endMapIndex: $endMapIndex, " +
+      s"actualCommitMetadata: $actualCommitMetadata")
+    val map = commitMetadataForReducer.get(shuffleId);

Review Comment:
   nit
   ```suggestion
       val map = commitMetadataForReducer.get(shuffleId)
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/PartitionCompletenessValidator.scala:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.commit
+
+import org.apache.celeborn.common.CommitMetadata
+import org.apache.celeborn.common.internal.Logging
+
+abstract class AbstractPartitionCompletenessValidator {
+  def processSubPartition(
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata,
+      expectedTotalMapperCount: Int): (Boolean, String)
+
+  def currentCommitMetadata(partitionId: Int): CommitMetadata
+
+  def isPartitionComplete(partitionId: Int): Boolean
+}
+
+class PartitionCompletenessValidator extends Logging {
+
+  private val skewHandlingValidator: AbstractPartitionCompletenessValidator =
+    new SkewHandlingWithoutMapRangeValidator
+  private val legacySkewHandlingValidator: 
AbstractPartitionCompletenessValidator =
+    new LegacySkewHandlingPartitionValidator
+
+  def validateSubPartition(
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata,
+      expectedCommitMetadata: CommitMetadata,
+      expectedTotalMapperCountForParent: Int,
+      skewPartitionHandlingWithoutMapRange: Boolean): (Boolean, String) = {

Review Comment:
   ditto. Suggest using guard clauses to reduce nesting.e.g. 
   ```suggestion
         skewPartitionHandlingWithoutMapRange: Boolean): (Boolean, String) = {
     
     val validator = if (skewPartitionHandlingWithoutMapRange) {
       skewHandlingValidator
     } else {
       legacySkewHandlingValidator
     }
     
     // Early returns for error conditions
     val (successfullyProcessed, error) = validator.processSubPartition(
       partitionId, startMapIndex, endMapIndex, actualCommitMetadata, 
expectedTotalMapperCountForParent)
     if (!successfullyProcessed) return (false, error)
     
     if (!validator.isPartitionComplete(partitionId)) {
       return (true, "Partition is valid but still waiting for more data")
     }
     
     val currentCommitMetadata = validator.currentCommitMetadata(partitionId)
     if (!CommitMetadata.checkCommitMetadata(expectedCommitMetadata, 
currentCommitMetadata)) {
       val errorMsg = s"AQE Partition $partitionId failed validation check " +
         s"while processing range startMapIndex: $startMapIndex endMapIndex: 
$endMapIndex " +
         s"ExpectedCommitMetadata $expectedCommitMetadata, ActualCommitMetadata 
$currentCommitMetadata"
       logError(errorMsg)
       return (false, errorMsg)
     }
     
     logInfo(s"AQE Partition $partitionId completed validation check, " +
       s"expectedCommitMetadata $expectedCommitMetadata, actualCommitMetadata 
$currentCommitMetadata")
     (true, "Partition is complete")
   }
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/LegacySkewHandlingPartitionValidator.scala:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.commit
+
+import java.util
+import java.util.Comparator
+import java.util.function.BiFunction
+
+import com.google.common.base.Preconditions.{checkArgument, checkState}
+
+import org.apache.celeborn.common.CommitMetadata
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.JavaUtils
+
+class LegacySkewHandlingPartitionValidator extends 
AbstractPartitionCompletenessValidator
+  with Logging {
+  private val subRangeToCommitMetadataPerReducer = {
+    JavaUtils.newConcurrentHashMap[Int, java.util.TreeMap[(Int, Int), 
CommitMetadata]]()
+  }
+  private val partitionToSubPartitionCount = 
JavaUtils.newConcurrentHashMap[Int, Int]()
+  private val currentCommitMetadataForReducer =
+    JavaUtils.newConcurrentHashMap[Int, CommitMetadata]()
+  private val currentTotalMapIdCountForReducer = 
JavaUtils.newConcurrentHashMap[Int, Int]()
+  private val comparator: java.util.Comparator[(Int, Int)] = new 
Comparator[(Int, Int)] {
+    override def compare(o1: (Int, Int), o2: (Int, Int)): Int = {
+      val comparator = Integer.compare(o1._1, o2._1)
+      if (comparator != 0)
+        comparator
+      else
+        Integer.compare(o1._2, o2._2)
+    }
+  }
+
+  private def checkOverlappingRange(
+      treeMap: java.util.TreeMap[(Int, Int), CommitMetadata],
+      startMapIndex: Int,
+      endMapIndex: Int): (Boolean, ((Int, Int), CommitMetadata)) = {
+    val floorEntry: util.Map.Entry[(Int, Int), CommitMetadata] =
+      treeMap.floorEntry((startMapIndex, endMapIndex))
+    val ceilingEntry: util.Map.Entry[(Int, Int), CommitMetadata] =
+      treeMap.ceilingEntry((startMapIndex, endMapIndex))
+
+    if (floorEntry != null) {
+      if (startMapIndex < floorEntry.getKey._2) {
+        return (true, ((floorEntry.getKey._1, floorEntry.getKey._2), 
floorEntry.getValue))
+      }
+    }
+    if (ceilingEntry != null) {
+      if (endMapIndex > ceilingEntry.getKey._1) {
+        return (true, ((ceilingEntry.getKey._1, ceilingEntry.getKey._2), 
ceilingEntry.getValue))
+      }
+    }
+    (false, ((0, 0), new CommitMetadata()))
+  }
+
+  override def processSubPartition(
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata,
+      expectedTotalMapperCount: Int): (Boolean, String) = {
+    checkArgument(
+      startMapIndex < endMapIndex,
+      "startMapIndex %s must be less than endMapIndex %s",
+      startMapIndex,
+      endMapIndex)
+    logDebug(
+      s"Validate partition invoked for partitionId $partitionId startMapIndex 
$startMapIndex endMapIndex $endMapIndex")
+    partitionToSubPartitionCount.put(partitionId, expectedTotalMapperCount)
+    val subRangeToCommitMetadataMap = 
subRangeToCommitMetadataPerReducer.computeIfAbsent(
+      partitionId,
+      new java.util.function.Function[Int, util.TreeMap[(Int, Int), 
CommitMetadata]] {
+        override def apply(key: Int): util.TreeMap[(Int, Int), CommitMetadata] 
=
+          new util.TreeMap[(Int, Int), CommitMetadata](comparator)
+      })
+    subRangeToCommitMetadataMap.synchronized {

Review Comment:
         val rangeKey = (startMapIndex, endMapIndex)
   



##########
client/src/main/scala/org/apache/celeborn/client/commit/LegacySkewHandlingPartitionValidator.scala:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client.commit
+
+import java.util
+import java.util.Comparator
+import java.util.function.BiFunction
+
+import com.google.common.base.Preconditions.{checkArgument, checkState}
+
+import org.apache.celeborn.common.CommitMetadata
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.JavaUtils
+
+class LegacySkewHandlingPartitionValidator extends 
AbstractPartitionCompletenessValidator
+  with Logging {
+  private val subRangeToCommitMetadataPerReducer = {
+    JavaUtils.newConcurrentHashMap[Int, java.util.TreeMap[(Int, Int), 
CommitMetadata]]()
+  }
+  private val partitionToSubPartitionCount = 
JavaUtils.newConcurrentHashMap[Int, Int]()
+  private val currentCommitMetadataForReducer =
+    JavaUtils.newConcurrentHashMap[Int, CommitMetadata]()
+  private val currentTotalMapIdCountForReducer = 
JavaUtils.newConcurrentHashMap[Int, Int]()
+  private val comparator: java.util.Comparator[(Int, Int)] = new 
Comparator[(Int, Int)] {
+    override def compare(o1: (Int, Int), o2: (Int, Int)): Int = {
+      val comparator = Integer.compare(o1._1, o2._1)
+      if (comparator != 0)
+        comparator
+      else
+        Integer.compare(o1._2, o2._2)
+    }
+  }
+
+  private def checkOverlappingRange(
+      treeMap: java.util.TreeMap[(Int, Int), CommitMetadata],
+      startMapIndex: Int,
+      endMapIndex: Int): (Boolean, ((Int, Int), CommitMetadata)) = {
+    val floorEntry: util.Map.Entry[(Int, Int), CommitMetadata] =
+      treeMap.floorEntry((startMapIndex, endMapIndex))
+    val ceilingEntry: util.Map.Entry[(Int, Int), CommitMetadata] =
+      treeMap.ceilingEntry((startMapIndex, endMapIndex))
+
+    if (floorEntry != null) {
+      if (startMapIndex < floorEntry.getKey._2) {
+        return (true, ((floorEntry.getKey._1, floorEntry.getKey._2), 
floorEntry.getValue))
+      }
+    }
+    if (ceilingEntry != null) {
+      if (endMapIndex > ceilingEntry.getKey._1) {
+        return (true, ((ceilingEntry.getKey._1, ceilingEntry.getKey._2), 
ceilingEntry.getValue))
+      }
+    }
+    (false, ((0, 0), new CommitMetadata()))
+  }
+
+  override def processSubPartition(
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata,
+      expectedTotalMapperCount: Int): (Boolean, String) = {

Review Comment:
   Consider using guard clauses to reduce nesting complexity and improve 
readability.e.g.
   ```suggestion
         expectedTotalMapperCount: Int): (Boolean, String) = {
     
     checkArgument(
       startMapIndex < endMapIndex,
       "startMapIndex %s must be less than endMapIndex %s",
       startMapIndex,
       endMapIndex)
     
     logDebug(
       s"Validate partition invoked for partitionId $partitionId startMapIndex 
$startMapIndex endMapIndex $endMapIndex")
     
     partitionToSubPartitionCount.put(partitionId, expectedTotalMapperCount)
     val subRangeToCommitMetadataMap = 
subRangeToCommitMetadataPerReducer.computeIfAbsent(
       partitionId,
       new java.util.function.Function[Int, util.TreeMap[(Int, Int), 
CommitMetadata]] {
         override def apply(key: Int): util.TreeMap[(Int, Int), CommitMetadata] 
=
           new util.TreeMap[(Int, Int), CommitMetadata](comparator)
       })
     
     subRangeToCommitMetadataMap.synchronized {
       val rangeKey = (startMapIndex, endMapIndex)
       val existingMetadata = subRangeToCommitMetadataMap.get(rangeKey)
       
       if (existingMetadata == null) {
         // Guard clause: Check for overlapping ranges
         val (isOverlapping, overlappingEntry) =
           checkOverlappingRange(subRangeToCommitMetadataMap, startMapIndex, 
endMapIndex)
         if (isOverlapping) {
           val errorMessage = s"Encountered overlapping map range for 
partitionId: $partitionId " +
             s" while processing range with startMapIndex: $startMapIndex and 
endMapIndex: $endMapIndex " +
             s"existing range map: $subRangeToCommitMetadataMap " +
             s"overlapped with Entry((startMapIndex, endMapIndex), count): 
$overlappingEntry"
           logError(errorMessage)
           return (false, errorMessage)
         }
         
         // Process new range
         subRangeToCommitMetadataMap.put(rangeKey, actualCommitMetadata)
         currentCommitMetadataForReducer.merge(partitionId, 
actualCommitMetadata, commitMetadataMerger)
         currentTotalMapIdCountForReducer.merge(partitionId, endMapIndex - 
startMapIndex, integerSumMerger)
         
       } else {
         // Guard clause: Check metadata consistency
         if (existingMetadata != actualCommitMetadata) {
           val errorMessage = s"Commit Metadata for partition: $partitionId " +
             s"not matching for sub-partition with startMapIndex: 
$startMapIndex endMapIndex: $endMapIndex " +
             s"previous count: $existingMetadata new count: 
$actualCommitMetadata"
           logError(errorMessage)
           return (false, errorMessage)
         }
       }
       
       ensureCountersExist(partitionId, startMapIndex, endMapIndex)
       
       // Guard clause: Check if total count exceeds expected
       val sumOfMapRanges = currentTotalMapIdCountForReducer.get(partitionId)
       if (sumOfMapRanges > expectedTotalMapperCount) {
         val currentCommitMetadata = 
currentCommitMetadataForReducer.get(partitionId)
         val errorMsg = s"AQE Partition $partitionId failed validation check " +
           s"while processing startMapIndex: $startMapIndex endMapIndex: 
$endMapIndex " +
           s"ActualCommitMetadata $currentCommitMetadata > 
ExpectedTotalMapperCount $expectedTotalMapperCount"
         logError(errorMsg)
         return (false, errorMsg)
       }
     }
     
     (true, "")
   }
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -301,19 +334,71 @@ class ReducePartitionCommitHandler(
   override def registerShuffle(
       shuffleId: Int,
       numMappers: Int,
-      isSegmentGranularityVisible: Boolean): Unit = {
-    super.registerShuffle(shuffleId, numMappers, isSegmentGranularityVisible)
+      isSegmentGranularityVisible: Boolean,
+      numPartitions: Int): Unit = {
+    super.registerShuffle(shuffleId, numMappers, isSegmentGranularityVisible, 
numPartitions)
     getReducerFileGroupRequest.put(shuffleId, new 
util.HashSet[MultiSerdeVersionRpcContext]())
-    initMapperAttempts(shuffleId, numMappers)
+    initMapperAttempts(shuffleId, numMappers, numPartitions)
+  }
+
+  override def finishPartition(
+      shuffleId: Int,
+      partitionId: Int,
+      startMapIndex: Int,
+      endMapIndex: Int,
+      actualCommitMetadata: CommitMetadata): (Boolean, String) = {
+    logDebug(s"finish Partition call: shuffleId: $shuffleId, " +
+      s"partitionId: $partitionId, " +
+      s"startMapIndex: $startMapIndex " +
+      s"endMapIndex: $endMapIndex, " +
+      s"actualCommitMetadata: $actualCommitMetadata")
+    val map = commitMetadataForReducer.get(shuffleId);
+    checkState(
+      map != null,
+      "CommitMetadata map cannot be null for a registered shuffleId: %d",
+      shuffleId)
+    val expectedCommitMetadata = map(partitionId)
+    if (endMapIndex == Integer.MAX_VALUE) {
+      // complete partition available
+      val bool = CommitMetadata.checkCommitMetadata(actualCommitMetadata, 
expectedCommitMetadata)
+      var message = ""
+      if (!bool) {
+        message =
+          s"CommitMetadata mismatch for shuffleId: $shuffleId partitionId: 
$partitionId expected: $expectedCommitMetadata actual: $actualCommitMetadata"
+      } else {
+        logInfo(
+          s"CommitMetadata matched for shuffleID : $shuffleId, partitionId: 
$partitionId expected: $expectedCommitMetadata actual: $actualCommitMetadata")
+      }
+      return (bool, message)
+    }
+
+    val splitSkewPartitionWithoutMapRange =
+      ClientUtils.readSkewPartitionWithoutMapRange(conf, startMapIndex, 
endMapIndex)
+
+    val validator = aqePartitionCompletenessValidator.computeIfAbsent(
+      shuffleId,
+      new java.util.function.Function[Int, PartitionCompletenessValidator] {
+        override def apply(key: Int): PartitionCompletenessValidator =
+          new PartitionCompletenessValidator()
+      })
+    validator.validateSubPartition(
+      partitionId,
+      startMapIndex,
+      endMapIndex,
+      actualCommitMetadata,
+      expectedCommitMetadata,
+      shuffleMapperAttempts.get(shuffleId).length,
+      splitSkewPartitionWithoutMapRange)
   }
 
-  private def initMapperAttempts(shuffleId: Int, numMappers: Int): Unit = {
+  private def initMapperAttempts(shuffleId: Int, numMappers: Int, 
numPartitions: Int): Unit = {
     shuffleMapperAttempts.synchronized {
       if (!shuffleMapperAttempts.containsKey(shuffleId)) {
         val attempts = new Array[Int](numMappers)
         0 until numMappers foreach (idx => attempts(idx) = -1)
         shuffleMapperAttempts.put(shuffleId, attempts)
       }
+      commitMetadataForReducer.put(shuffleId, new 
Array[CommitMetadata](numPartitions))

Review Comment:
   Should we add a shuffleIntegrityCheckEnabled check here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to