errose28 commented on code in PR #7293:
URL: https://github.com/apache/ozone/pull/7293#discussion_r1842662168


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);
+    }
+
+    // Update Container Diff metrics based on the diff report.
+    if (report.needsRepair()) {
+      metrics.incrementUnhealthyContainerDiffs();
+    } else {
+      metrics.incrementHealthyContainerDiffs();
+    }
+    metrics.incrementMerkleTreeDiffSuccesses();
+    return report;
+  }
+
+  private void 
compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
+                                          
ContainerProtos.ContainerChecksumInfo peerChecksumInfo,
+                                          ContainerDiff report) {
+    ContainerProtos.ContainerMerkleTree thisMerkleTree = 
thisChecksumInfo.getContainerMerkleTree();
+    ContainerProtos.ContainerMerkleTree peerMerkleTree = 
peerChecksumInfo.getContainerMerkleTree();
+    Set<Long> thisDeletedBlockSet = new 
HashSet<>(thisChecksumInfo.getDeletedBlocksList());
+    Set<Long> peerDeletedBlockSet = new 
HashSet<>(peerChecksumInfo.getDeletedBlocksList());
+
+    if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
+      return;
+    }
+
+    List<ContainerProtos.BlockMerkleTree> thisBlockMerkleTreeList = 
thisMerkleTree.getBlockMerkleTreeList();
+    List<ContainerProtos.BlockMerkleTree> peerBlockMerkleTreeList = 
peerMerkleTree.getBlockMerkleTreeList();
+    int thisIdx = 0, peerIdx = 0;
+
+    // Step 1: Process both lists while elements are present in both
+    while (thisIdx < thisBlockMerkleTreeList.size() && peerIdx < 
peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree thisBlockMerkleTree = 
thisBlockMerkleTreeList.get(thisIdx);
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+
+      if (thisBlockMerkleTree.getBlockID() == 
peerBlockMerkleTree.getBlockID()) {
+        // Matching block ID; check if the block is deleted and handle the 
cases;
+        // 1) If the block is deleted in both the block merkle tree, We can 
ignore comparing them.
+        // 2) If the block is only deleted in our merkle tree, The BG service 
should have deleted our
+        //    block and the peer's BG service hasn't run yet. We can ignore 
comparing them.
+        // 3) If the block is only deleted in peer merkle tree, we can't 
reconcile for this block. It might be
+        //    deleted by peer's BG service. We can ignore comparing them.
+        if (!thisDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            !peerDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            thisBlockMerkleTree.getBlockChecksum() != 
peerBlockMerkleTree.getBlockChecksum()) {
+          compareBlockMerkleTree(thisBlockMerkleTree, peerBlockMerkleTree, 
report);
+        }
+        thisIdx++;
+        peerIdx++;
+      } else if (thisBlockMerkleTree.getBlockID() < 
peerBlockMerkleTree.getBlockID()) {
+        // This block's ID is smaller; advance thisIdx to catch up

Review Comment:
   Better to comment what this means vs what the code does. This means that we 
have blocks that the peer does not have, so we cannot use this peer to check 
them.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);
+    }
+
+    // Update Container Diff metrics based on the diff report.
+    if (report.needsRepair()) {
+      metrics.incrementUnhealthyContainerDiffs();
+    } else {
+      metrics.incrementHealthyContainerDiffs();
+    }
+    metrics.incrementMerkleTreeDiffSuccesses();
+    return report;
+  }
+
+  private void 
compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
+                                          
ContainerProtos.ContainerChecksumInfo peerChecksumInfo,
+                                          ContainerDiff report) {
+    ContainerProtos.ContainerMerkleTree thisMerkleTree = 
thisChecksumInfo.getContainerMerkleTree();
+    ContainerProtos.ContainerMerkleTree peerMerkleTree = 
peerChecksumInfo.getContainerMerkleTree();
+    Set<Long> thisDeletedBlockSet = new 
HashSet<>(thisChecksumInfo.getDeletedBlocksList());
+    Set<Long> peerDeletedBlockSet = new 
HashSet<>(peerChecksumInfo.getDeletedBlocksList());
+
+    if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
+      return;
+    }
+
+    List<ContainerProtos.BlockMerkleTree> thisBlockMerkleTreeList = 
thisMerkleTree.getBlockMerkleTreeList();
+    List<ContainerProtos.BlockMerkleTree> peerBlockMerkleTreeList = 
peerMerkleTree.getBlockMerkleTreeList();
+    int thisIdx = 0, peerIdx = 0;
+
+    // Step 1: Process both lists while elements are present in both
+    while (thisIdx < thisBlockMerkleTreeList.size() && peerIdx < 
peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree thisBlockMerkleTree = 
thisBlockMerkleTreeList.get(thisIdx);
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+
+      if (thisBlockMerkleTree.getBlockID() == 
peerBlockMerkleTree.getBlockID()) {
+        // Matching block ID; check if the block is deleted and handle the 
cases;
+        // 1) If the block is deleted in both the block merkle tree, We can 
ignore comparing them.
+        // 2) If the block is only deleted in our merkle tree, The BG service 
should have deleted our
+        //    block and the peer's BG service hasn't run yet. We can ignore 
comparing them.
+        // 3) If the block is only deleted in peer merkle tree, we can't 
reconcile for this block. It might be
+        //    deleted by peer's BG service. We can ignore comparing them.
+        if (!thisDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            !peerDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            thisBlockMerkleTree.getBlockChecksum() != 
peerBlockMerkleTree.getBlockChecksum()) {
+          compareBlockMerkleTree(thisBlockMerkleTree, peerBlockMerkleTree, 
report);
+        }
+        thisIdx++;
+        peerIdx++;
+      } else if (thisBlockMerkleTree.getBlockID() < 
peerBlockMerkleTree.getBlockID()) {
+        // This block's ID is smaller; advance thisIdx to catch up
+        thisIdx++;
+      } else {
+        // Peer block's ID is smaller; record missing block if 
peerDeletedBlockSet doesn't contain the blockId
+        // and advance peerIdx
+        if (!peerDeletedBlockSet.contains(peerBlockMerkleTree.getBlockID())) {
+          report.addMissingBlock(peerBlockMerkleTree);
+        }
+        peerIdx++;
+      }
+    }
+
+    // Step 2: Process remaining blocks in the peer list
+    while (peerIdx < peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+      if (!peerDeletedBlockSet.contains(peerBlockMerkleTree.getBlockID())) {
+        report.addMissingBlock(peerBlockMerkleTree);
+      }
+      peerIdx++;
+    }

Review Comment:
   Let's add one more comment that we don't need to process the extra blocks in 
our list since the peer does not have them to compare against.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);
+    }
+
+    // Update Container Diff metrics based on the diff report.
+    if (report.needsRepair()) {
+      metrics.incrementUnhealthyContainerDiffs();
+    } else {
+      metrics.incrementHealthyContainerDiffs();
+    }
+    metrics.incrementMerkleTreeDiffSuccesses();
+    return report;
+  }
+
+  private void 
compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
+                                          
ContainerProtos.ContainerChecksumInfo peerChecksumInfo,
+                                          ContainerDiff report) {
+    ContainerProtos.ContainerMerkleTree thisMerkleTree = 
thisChecksumInfo.getContainerMerkleTree();
+    ContainerProtos.ContainerMerkleTree peerMerkleTree = 
peerChecksumInfo.getContainerMerkleTree();
+    Set<Long> thisDeletedBlockSet = new 
HashSet<>(thisChecksumInfo.getDeletedBlocksList());
+    Set<Long> peerDeletedBlockSet = new 
HashSet<>(peerChecksumInfo.getDeletedBlocksList());
+
+    if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
+      return;
+    }
+
+    List<ContainerProtos.BlockMerkleTree> thisBlockMerkleTreeList = 
thisMerkleTree.getBlockMerkleTreeList();
+    List<ContainerProtos.BlockMerkleTree> peerBlockMerkleTreeList = 
peerMerkleTree.getBlockMerkleTreeList();
+    int thisIdx = 0, peerIdx = 0;
+
+    // Step 1: Process both lists while elements are present in both
+    while (thisIdx < thisBlockMerkleTreeList.size() && peerIdx < 
peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree thisBlockMerkleTree = 
thisBlockMerkleTreeList.get(thisIdx);
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+
+      if (thisBlockMerkleTree.getBlockID() == 
peerBlockMerkleTree.getBlockID()) {
+        // Matching block ID; check if the block is deleted and handle the 
cases;
+        // 1) If the block is deleted in both the block merkle tree, We can 
ignore comparing them.
+        // 2) If the block is only deleted in our merkle tree, The BG service 
should have deleted our
+        //    block and the peer's BG service hasn't run yet. We can ignore 
comparing them.
+        // 3) If the block is only deleted in peer merkle tree, we can't 
reconcile for this block. It might be

Review Comment:
   I would add a TODO and a Jira if we don't have it already for reconciliation 
to handle missed block deletions here as well. Right now we are not merging the 
deleted block lists but when we decide how to handle missed deletes that may be 
part of the process.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -245,11 +372,48 @@ public static boolean checksumFileExist(Container 
container) {
    * This class represents the difference between our replica of a container 
and a peer's replica of a container.
    * It summarizes the operations we need to do to reconcile our replica with 
the peer replica it was compared to.
    *
-   * TODO HDDS-10928
    */
   public static class ContainerDiff {
+    private final List<ContainerProtos.BlockMerkleTree> missingBlocks;
+    private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> 
missingChunks;
+    private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> 
corruptChunks;
+
     public ContainerDiff() {
+      this.missingBlocks = new ArrayList<>();
+      this.missingChunks = new HashMap<>();
+      this.corruptChunks = new HashMap<>();
+    }
+
+    public void addMissingBlock(ContainerProtos.BlockMerkleTree 
missingBlockMerkleTree) {
+      this.missingBlocks.add(missingBlockMerkleTree);
+    }
+
+    public void addMissingChunk(long blockId, ContainerProtos.ChunkMerkleTree 
missingChunkMerkleTree) {
+      this.missingChunks.computeIfAbsent(blockId, any -> new 
ArrayList<>()).add(missingChunkMerkleTree);
+    }
+
+    public void addCorruptChunk(long blockId, ContainerProtos.ChunkMerkleTree 
corruptChunk) {
+      this.corruptChunks.computeIfAbsent(blockId, any -> new 
ArrayList<>()).add(corruptChunk);
+    }
+
+    public List<ContainerProtos.BlockMerkleTree> getMissingBlocks() {
+      return missingBlocks;
+    }
+
+    public Map<Long, List<ContainerProtos.ChunkMerkleTree>> getMissingChunks() 
{
+      return missingChunks;
+    }
+
+    public Map<Long, List<ContainerProtos.ChunkMerkleTree>> getCorruptChunks() 
{
+      return corruptChunks;
+    }
 
+    /**
+     * If isHealthy is true, It means current replica is healthy. The peer 
replica still may have corruption,
+     * which it will fix when it reconciles with other peers.
+     */
+    public boolean isHealthy() {

Review Comment:
   Technically our replica may not be healthy either. It could be that both our 
replica and our peer have the same corrupted chunk or are missing the same 
data. Maybe we should call this something else, like `needsRepair`?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);

Review Comment:
   Let's add a test for this case. We can use a mocked peer checksum info 
object that throws an exception when a method is called.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);
+    }
+
+    // Update Container Diff metrics based on the diff report.
+    if (report.needsRepair()) {
+      metrics.incrementUnhealthyContainerDiffs();
+    } else {
+      metrics.incrementHealthyContainerDiffs();
+    }
+    metrics.incrementMerkleTreeDiffSuccesses();
+    return report;
+  }
+
+  private void 
compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
+                                          
ContainerProtos.ContainerChecksumInfo peerChecksumInfo,
+                                          ContainerDiff report) {
+    ContainerProtos.ContainerMerkleTree thisMerkleTree = 
thisChecksumInfo.getContainerMerkleTree();
+    ContainerProtos.ContainerMerkleTree peerMerkleTree = 
peerChecksumInfo.getContainerMerkleTree();
+    Set<Long> thisDeletedBlockSet = new 
HashSet<>(thisChecksumInfo.getDeletedBlocksList());
+    Set<Long> peerDeletedBlockSet = new 
HashSet<>(peerChecksumInfo.getDeletedBlocksList());
+
+    if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
+      return;
+    }
+
+    List<ContainerProtos.BlockMerkleTree> thisBlockMerkleTreeList = 
thisMerkleTree.getBlockMerkleTreeList();
+    List<ContainerProtos.BlockMerkleTree> peerBlockMerkleTreeList = 
peerMerkleTree.getBlockMerkleTreeList();
+    int thisIdx = 0, peerIdx = 0;
+
+    // Step 1: Process both lists while elements are present in both
+    while (thisIdx < thisBlockMerkleTreeList.size() && peerIdx < 
peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree thisBlockMerkleTree = 
thisBlockMerkleTreeList.get(thisIdx);
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+
+      if (thisBlockMerkleTree.getBlockID() == 
peerBlockMerkleTree.getBlockID()) {
+        // Matching block ID; check if the block is deleted and handle the 
cases;
+        // 1) If the block is deleted in both the block merkle tree, We can 
ignore comparing them.
+        // 2) If the block is only deleted in our merkle tree, The BG service 
should have deleted our
+        //    block and the peer's BG service hasn't run yet. We can ignore 
comparing them.
+        // 3) If the block is only deleted in peer merkle tree, we can't 
reconcile for this block. It might be
+        //    deleted by peer's BG service. We can ignore comparing them.
+        if (!thisDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            !peerDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            thisBlockMerkleTree.getBlockChecksum() != 
peerBlockMerkleTree.getBlockChecksum()) {
+          compareBlockMerkleTree(thisBlockMerkleTree, peerBlockMerkleTree, 
report);
+        }
+        thisIdx++;
+        peerIdx++;
+      } else if (thisBlockMerkleTree.getBlockID() < 
peerBlockMerkleTree.getBlockID()) {
+        // This block's ID is smaller; advance thisIdx to catch up
+        thisIdx++;
+      } else {
+        // Peer block's ID is smaller; record missing block if 
peerDeletedBlockSet doesn't contain the blockId
+        // and advance peerIdx
+        if (!peerDeletedBlockSet.contains(peerBlockMerkleTree.getBlockID())) {
+          report.addMissingBlock(peerBlockMerkleTree);
+        }
+        peerIdx++;
+      }
+    }
+
+    // Step 2: Process remaining blocks in the peer list
+    while (peerIdx < peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+      if (!peerDeletedBlockSet.contains(peerBlockMerkleTree.getBlockID())) {
+        report.addMissingBlock(peerBlockMerkleTree);
+      }
+      peerIdx++;
+    }
+  }
+
+  private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree 
thisBlockMerkleTree,
+                                      ContainerProtos.BlockMerkleTree 
peerBlockMerkleTree,
+                                      ContainerDiff report) {
+
+    List<ContainerProtos.ChunkMerkleTree> thisChunkMerkleTreeList = 
thisBlockMerkleTree.getChunkMerkleTreeList();
+    List<ContainerProtos.ChunkMerkleTree> peerChunkMerkleTreeList = 
peerBlockMerkleTree.getChunkMerkleTreeList();
+    int thisIdx = 0, peerIdx = 0;
+
+    // Step 1: Process both lists while elements are present in both
+    while (thisIdx < thisChunkMerkleTreeList.size() && peerIdx < 
peerChunkMerkleTreeList.size()) {
+      ContainerProtos.ChunkMerkleTree thisChunkMerkleTree = 
thisChunkMerkleTreeList.get(thisIdx);
+      ContainerProtos.ChunkMerkleTree peerChunkMerkleTree = 
peerChunkMerkleTreeList.get(peerIdx);
+
+      if (thisChunkMerkleTree.getOffset() == peerChunkMerkleTree.getOffset()) {
+        // Possible state when this Checksum != peer Checksum:
+        // thisTree = Healthy, peerTree = Healthy -> We don't know what is 
healthy. Skip.

Review Comment:
   We know both trees have not had corruption detected so we can consider them 
healthy/not needing repair. Not sure if the comment is trying to convey this.



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java:
##########
@@ -299,6 +330,62 @@ public void testEmptyFile() throws Exception {
     assertEquals(CONTAINER_ID, info.getContainerID());
   }
 
+  @Test
+  public void testContainerWithNoDiff() throws IOException {
+    ContainerMerkleTree ourMerkleTree = buildTestTree(config);
+    ContainerMerkleTree peerMerkleTree = buildTestTree(config);
+    checksumManager.writeContainerDataTree(container, ourMerkleTree);
+    ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
ContainerProtos.ContainerChecksumInfo.newBuilder()
+            .setContainerID(container.getContainerID())
+            .setContainerMerkleTree(peerMerkleTree.toProto()).build();
+    ContainerChecksumTreeManager.ContainerDiff diff = 
checksumManager.diff(container, peerChecksumInfo);
+    
assertTrue(checksumManager.getMetrics().getMerkleTreeDiffLatencyNS().lastStat().total()
 > 0);
+    assertFalse(diff.needsRepair());

Review Comment:
   We should assert the success count metric here too.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -245,11 +379,49 @@ public static boolean checksumFileExist(Container 
container) {
    * This class represents the difference between our replica of a container 
and a peer's replica of a container.
    * It summarizes the operations we need to do to reconcile our replica with 
the peer replica it was compared to.
    *
-   * TODO HDDS-10928
    */
   public static class ContainerDiff {
+    private final List<ContainerProtos.BlockMerkleTree> missingBlocks;
+    private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> 
missingChunks;
+    private final Map<Long, List<ContainerProtos.ChunkMerkleTree>> 
corruptChunks;
+
     public ContainerDiff() {
+      this.missingBlocks = new ArrayList<>();
+      this.missingChunks = new HashMap<>();
+      this.corruptChunks = new HashMap<>();
+    }
+
+    public void addMissingBlock(ContainerProtos.BlockMerkleTree 
missingBlockMerkleTree) {
+      this.missingBlocks.add(missingBlockMerkleTree);
+    }
+
+    public void addMissingChunk(long blockId, ContainerProtos.ChunkMerkleTree 
missingChunkMerkleTree) {
+      this.missingChunks.computeIfAbsent(blockId, any -> new 
ArrayList<>()).add(missingChunkMerkleTree);
+    }
+
+    public void addCorruptChunk(long blockId, ContainerProtos.ChunkMerkleTree 
corruptChunk) {
+      this.corruptChunks.computeIfAbsent(blockId, any -> new 
ArrayList<>()).add(corruptChunk);
+    }
+
+    public List<ContainerProtos.BlockMerkleTree> getMissingBlocks() {
+      return missingBlocks;
+    }
+
+    public Map<Long, List<ContainerProtos.ChunkMerkleTree>> getMissingChunks() 
{
+      return missingChunks;
+    }
+
+    public Map<Long, List<ContainerProtos.ChunkMerkleTree>> getCorruptChunks() 
{
+      return corruptChunks;
+    }
 
+    /**
+     * If needRepair is true, It means current replica needs blocks/chunks 
from the peer to repair
+     * it's container replica. The peer replica still may have corruption, 
which it will fix when

Review Comment:
   nit.
   ```suggestion
        * its container replica. The peer replica still may have corruption, 
which it will fix when
   ```



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java:
##########
@@ -131,21 +141,221 @@ public static ContainerMerkleTree 
buildTestTree(ConfigurationSource conf) {
     final long blockID1 = 1;
     final long blockID2 = 2;
     final long blockID3 = 3;
+    final long blockID4 = 4;
+    final long blockID5 = 5;
     ContainerProtos.ChunkInfo b1c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{1, 2, 3}));
     ContainerProtos.ChunkInfo b1c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{4, 5, 6}));
-    ContainerProtos.ChunkInfo b2c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{7, 8, 9}));
-    ContainerProtos.ChunkInfo b2c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{12, 11, 10}));
-    ContainerProtos.ChunkInfo b3c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{13, 14, 15}));
-    ContainerProtos.ChunkInfo b3c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{16, 17, 18}));
+    ContainerProtos.ChunkInfo b1c3 = buildChunk(conf, 2, ByteBuffer.wrap(new 
byte[]{7, 8, 9}));
+    ContainerProtos.ChunkInfo b1c4 = buildChunk(conf, 3, ByteBuffer.wrap(new 
byte[]{12, 11, 10}));
+    ContainerProtos.ChunkInfo b2c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{13, 14, 15}));
+    ContainerProtos.ChunkInfo b2c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{16, 17, 18}));
+    ContainerProtos.ChunkInfo b2c3 = buildChunk(conf, 2, ByteBuffer.wrap(new 
byte[]{19, 20, 21}));
+    ContainerProtos.ChunkInfo b2c4 = buildChunk(conf, 3, ByteBuffer.wrap(new 
byte[]{22, 23, 24}));
+    ContainerProtos.ChunkInfo b3c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{25, 26, 27}));
+    ContainerProtos.ChunkInfo b3c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{28, 29, 30}));
+    ContainerProtos.ChunkInfo b3c3 = buildChunk(conf, 2, ByteBuffer.wrap(new 
byte[]{31, 32, 33}));
+    ContainerProtos.ChunkInfo b3c4 = buildChunk(conf, 3, ByteBuffer.wrap(new 
byte[]{34, 35, 36}));
+    ContainerProtos.ChunkInfo b4c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{37, 38, 29}));
+    ContainerProtos.ChunkInfo b4c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{40, 41, 42}));
+    ContainerProtos.ChunkInfo b4c3 = buildChunk(conf, 2, ByteBuffer.wrap(new 
byte[]{43, 44, 45}));
+    ContainerProtos.ChunkInfo b4c4 = buildChunk(conf, 3, ByteBuffer.wrap(new 
byte[]{46, 47, 48}));
+    ContainerProtos.ChunkInfo b5c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{49, 50, 51}));
+    ContainerProtos.ChunkInfo b5c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{52, 53, 54}));
+    ContainerProtos.ChunkInfo b5c3 = buildChunk(conf, 2, ByteBuffer.wrap(new 
byte[]{55, 56, 57}));
+    ContainerProtos.ChunkInfo b5c4 = buildChunk(conf, 3, ByteBuffer.wrap(new 
byte[]{58, 59, 60}));
 
     ContainerMerkleTree tree = new ContainerMerkleTree();
-    tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
-    tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
-    tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+    tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2, b1c3, b1c4));
+    tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2, b2c3, b2c4));
+    tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2, b3c3, b3c4));
+    tree.addChunks(blockID4, Arrays.asList(b4c1, b4c2, b4c3, b4c4));
+    tree.addChunks(blockID5, Arrays.asList(b5c1, b5c2, b5c3, b5c4));
 
     return tree;
   }
 
+  public static Pair<ContainerMerkleTree, 
ContainerChecksumTreeManager.ContainerDiff> buildTestTreeWithMismatches(
+      ConfigurationSource conf, int numMissingBlocks, int numMissingChunks, 
int numCorruptChunks) {
+
+    ContainerMerkleTree tree = buildTestTree(conf);

Review Comment:
   > I tried to operate on the protobuf, but the problem is that all the Lists 
are UnmodifiableList; hence, it is not possible to modify the protobuf.
   
   We can replace the lists with a new instance.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);
+    }
+
+    // Update Container Diff metrics based on the diff report.
+    if (report.needsRepair()) {
+      metrics.incrementUnhealthyContainerDiffs();
+    } else {
+      metrics.incrementHealthyContainerDiffs();
+    }

Review Comment:
   Let's update the metric names to match the "needs repair" terminology we are 
using in the report.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);
+    }
+
+    // Update Container Diff metrics based on the diff report.
+    if (report.needsRepair()) {
+      metrics.incrementUnhealthyContainerDiffs();
+    } else {
+      metrics.incrementHealthyContainerDiffs();
+    }
+    metrics.incrementMerkleTreeDiffSuccesses();
+    return report;
+  }
+
+  private void 
compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
+                                          
ContainerProtos.ContainerChecksumInfo peerChecksumInfo,
+                                          ContainerDiff report) {
+    ContainerProtos.ContainerMerkleTree thisMerkleTree = 
thisChecksumInfo.getContainerMerkleTree();
+    ContainerProtos.ContainerMerkleTree peerMerkleTree = 
peerChecksumInfo.getContainerMerkleTree();
+    Set<Long> thisDeletedBlockSet = new 
HashSet<>(thisChecksumInfo.getDeletedBlocksList());
+    Set<Long> peerDeletedBlockSet = new 
HashSet<>(peerChecksumInfo.getDeletedBlocksList());
+
+    if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
+      return;
+    }
+
+    List<ContainerProtos.BlockMerkleTree> thisBlockMerkleTreeList = 
thisMerkleTree.getBlockMerkleTreeList();
+    List<ContainerProtos.BlockMerkleTree> peerBlockMerkleTreeList = 
peerMerkleTree.getBlockMerkleTreeList();
+    int thisIdx = 0, peerIdx = 0;
+
+    // Step 1: Process both lists while elements are present in both
+    while (thisIdx < thisBlockMerkleTreeList.size() && peerIdx < 
peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree thisBlockMerkleTree = 
thisBlockMerkleTreeList.get(thisIdx);
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+
+      if (thisBlockMerkleTree.getBlockID() == 
peerBlockMerkleTree.getBlockID()) {
+        // Matching block ID; check if the block is deleted and handle the 
cases;
+        // 1) If the block is deleted in both the block merkle tree, We can 
ignore comparing them.
+        // 2) If the block is only deleted in our merkle tree, The BG service 
should have deleted our
+        //    block and the peer's BG service hasn't run yet. We can ignore 
comparing them.
+        // 3) If the block is only deleted in peer merkle tree, we can't 
reconcile for this block. It might be
+        //    deleted by peer's BG service. We can ignore comparing them.
+        if (!thisDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            !peerDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            thisBlockMerkleTree.getBlockChecksum() != 
peerBlockMerkleTree.getBlockChecksum()) {
+          compareBlockMerkleTree(thisBlockMerkleTree, peerBlockMerkleTree, 
report);
+        }
+        thisIdx++;
+        peerIdx++;
+      } else if (thisBlockMerkleTree.getBlockID() < 
peerBlockMerkleTree.getBlockID()) {
+        // This block's ID is smaller; advance thisIdx to catch up
+        thisIdx++;
+      } else {
+        // Peer block's ID is smaller; record missing block if 
peerDeletedBlockSet doesn't contain the blockId
+        // and advance peerIdx
+        if (!peerDeletedBlockSet.contains(peerBlockMerkleTree.getBlockID())) {
+          report.addMissingBlock(peerBlockMerkleTree);
+        }
+        peerIdx++;
+      }
+    }
+
+    // Step 2: Process remaining blocks in the peer list
+    while (peerIdx < peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+      if (!peerDeletedBlockSet.contains(peerBlockMerkleTree.getBlockID())) {
+        report.addMissingBlock(peerBlockMerkleTree);
+      }
+      peerIdx++;
+    }

Review Comment:
   Same for the chunk comparison as well.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -143,12 +149,140 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo otherInfo)
+  public ContainerDiff diff(KeyValueContainerData thisContainer, 
ContainerProtos.ContainerChecksumInfo peerChecksumInfo)
       throws IOException {
-    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
-    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
-    //  This method will use the read lock, which is unused in the current 
implementation.
-    return new ContainerDiff();
+    ContainerDiff report = new ContainerDiff();
+    try {
+      captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
+        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
+        Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
thisContainerChecksumInfoBuilder =
+            read(thisContainer);
+        if (!thisContainerChecksumInfoBuilder.isPresent()) {
+          throw new IOException("The container #" + 
thisContainer.getContainerID() +
+              " doesn't have container checksum");
+        }
+
+        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+          throw new IOException("Container Id does not match for container " + 
thisContainer.getContainerID());
+        }
+
+        ContainerProtos.ContainerChecksumInfo thisChecksumInfo = 
thisContainerChecksumInfoBuilder.get().build();
+        compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
+      });
+    } catch (IOException ex) {
+      metrics.incrementMerkleTreeDiffFailures();
+      throw new IOException("Container Diff failed for container #" + 
thisContainer.getContainerID(), ex);
+    }
+
+    // Update Container Diff metrics based on the diff report.
+    if (report.needsRepair()) {
+      metrics.incrementUnhealthyContainerDiffs();
+    } else {
+      metrics.incrementHealthyContainerDiffs();
+    }
+    metrics.incrementMerkleTreeDiffSuccesses();
+    return report;
+  }
+
+  private void 
compareContainerMerkleTree(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
+                                          
ContainerProtos.ContainerChecksumInfo peerChecksumInfo,
+                                          ContainerDiff report) {
+    ContainerProtos.ContainerMerkleTree thisMerkleTree = 
thisChecksumInfo.getContainerMerkleTree();
+    ContainerProtos.ContainerMerkleTree peerMerkleTree = 
peerChecksumInfo.getContainerMerkleTree();
+    Set<Long> thisDeletedBlockSet = new 
HashSet<>(thisChecksumInfo.getDeletedBlocksList());
+    Set<Long> peerDeletedBlockSet = new 
HashSet<>(peerChecksumInfo.getDeletedBlocksList());
+
+    if (thisMerkleTree.getDataChecksum() == peerMerkleTree.getDataChecksum()) {
+      return;
+    }
+
+    List<ContainerProtos.BlockMerkleTree> thisBlockMerkleTreeList = 
thisMerkleTree.getBlockMerkleTreeList();
+    List<ContainerProtos.BlockMerkleTree> peerBlockMerkleTreeList = 
peerMerkleTree.getBlockMerkleTreeList();
+    int thisIdx = 0, peerIdx = 0;
+
+    // Step 1: Process both lists while elements are present in both
+    while (thisIdx < thisBlockMerkleTreeList.size() && peerIdx < 
peerBlockMerkleTreeList.size()) {
+      ContainerProtos.BlockMerkleTree thisBlockMerkleTree = 
thisBlockMerkleTreeList.get(thisIdx);
+      ContainerProtos.BlockMerkleTree peerBlockMerkleTree = 
peerBlockMerkleTreeList.get(peerIdx);
+
+      if (thisBlockMerkleTree.getBlockID() == 
peerBlockMerkleTree.getBlockID()) {
+        // Matching block ID; check if the block is deleted and handle the 
cases;
+        // 1) If the block is deleted in both the block merkle tree, We can 
ignore comparing them.
+        // 2) If the block is only deleted in our merkle tree, The BG service 
should have deleted our
+        //    block and the peer's BG service hasn't run yet. We can ignore 
comparing them.
+        // 3) If the block is only deleted in peer merkle tree, we can't 
reconcile for this block. It might be
+        //    deleted by peer's BG service. We can ignore comparing them.
+        if (!thisDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            !peerDeletedBlockSet.contains(thisBlockMerkleTree.getBlockID()) &&
+            thisBlockMerkleTree.getBlockChecksum() != 
peerBlockMerkleTree.getBlockChecksum()) {
+          compareBlockMerkleTree(thisBlockMerkleTree, peerBlockMerkleTree, 
report);
+        }
+        thisIdx++;
+        peerIdx++;
+      } else if (thisBlockMerkleTree.getBlockID() < 
peerBlockMerkleTree.getBlockID()) {
+        // This block's ID is smaller; advance thisIdx to catch up

Review Comment:
   Same for the chunk comparison as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to