hemantk-12 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r2027573874


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -146,33 +150,31 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiffReport diff(KeyValueContainerData thisContainer,
+  /**
+   * Compares the checksum info of the container with the peer's checksum info 
and returns a report of the differences.
+   * @param thisChecksumInfo The checksum info of the container on this 
datanode.
+   * @param peerChecksumInfo The checksum info of the container on the peer 
datanode.
+   */
+  public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
                                   ContainerProtos.ContainerChecksumInfo 
peerChecksumInfo) throws
       StorageContainerException {
 
     ContainerDiffReport report = new ContainerDiffReport();
     try {
       captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
-        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(thisChecksumInfo, "Our checksum info is 
null");

Review Comment:
   nit:
   ```suggestion
           Preconditions.assertNotNull(thisChecksumInfo, "Datanode's checksum 
info is null.");
   ```
   Or `Checksum info is null on this datanode.`.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());

Review Comment:
   IMO, we should not update `containerData` here. Caller should get 
`checksumInfo` and update it. Or function name should be like 
`getChecksumInfoAndUpdateContainerData`.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);

Review Comment:
   nit: alignment is off.
   
   Why is `reconcileChunksPerBlock` throwing an exception if it is just getting 
logged here? It doesn't make sense to throw an exception from a private helper 
function if we are just logging it everywhere.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }

Review Comment:
   ```suggestion
       if (!optionalChecksumInfo.isPresent()) {
         // Try creating the checksum info from RocksDB metadata if it is not 
present.
         optionalChecksumInfo = createContainerMerkleTree(container);
         if (!optionalChecksumInfo.isPresent()) {
           throw new StorageContainerException("Failed to reconcile container " 
+ containerData.getContainerID()
               + " as checksum info is not available", 
CONTAINER_CHECKSUM_ERROR);
         }
       }
       ContainerProtos.ContainerChecksumInfo checksumInfo = 
optionalChecksumInfo.get();
       long oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java:
##########
@@ -441,6 +441,19 @@ public List<BlockData> listBlock(Container container, long 
startLocalID, int
     }
   }
 
+  @Override
+  public boolean blockExists(Container container, BlockID blockID) throws 
IOException {
+    KeyValueContainerData containerData = (KeyValueContainerData) container
+        .getContainerData();
+    try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+      // This is a post condition that acts as a hint to the user.
+      // Should never fail.

Review Comment:
   This comment is useless. Isn't it the reason why the precondition check is 
here?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -146,33 +150,31 @@ public void markBlocksAsDeleted(KeyValueContainerData 
data, Collection<Long> del
     }
   }
 
-  public ContainerDiffReport diff(KeyValueContainerData thisContainer,
+  /**
+   * Compares the checksum info of the container with the peer's checksum info 
and returns a report of the differences.
+   * @param thisChecksumInfo The checksum info of the container on this 
datanode.
+   * @param peerChecksumInfo The checksum info of the container on the peer 
datanode.
+   */
+  public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo 
thisChecksumInfo,
                                   ContainerProtos.ContainerChecksumInfo 
peerChecksumInfo) throws
       StorageContainerException {
 
     ContainerDiffReport report = new ContainerDiffReport();
     try {
       captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
-        Preconditions.assertNotNull(thisContainer, "Container data is null");
+        Preconditions.assertNotNull(thisChecksumInfo, "Our checksum info is 
null");
         Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is 
null");
-        Optional<ContainerProtos.ContainerChecksumInfo> 
thisContainerChecksumInfo = read(thisContainer);
-        if (!thisContainerChecksumInfo.isPresent()) {
-          throw new StorageContainerException("The container #" + 
thisContainer.getContainerID() +
-              " doesn't have container checksum", 
ContainerProtos.Result.IO_EXCEPTION);
-        }
-
-        if (thisContainer.getContainerID() != 
peerChecksumInfo.getContainerID()) {
+        if (thisChecksumInfo.getContainerID() != 
peerChecksumInfo.getContainerID()) {
           throw new StorageContainerException("Container Id does not match for 
container "

Review Comment:
   This exception is caught and wrapped again at line 176. Should we check the 
exception type and only warp to `StorageContainerException` if it is not a 
`StorageContainerException`?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java:
##########
@@ -145,6 +148,23 @@ public static void logRecovered(ContainerData 
containerData) {
     LOG.info(getMessage(containerData));
   }
 
+  /**
+   * Logged when a container is reconciled.
+   *
+   * @param containerData The container that was reconciled on this datanode.
+   * @param oldDataChecksum The old data checksum.
+   */
+  public static void logReconciled(ContainerData containerData, long 
oldDataChecksum, DatanodeDetails peer) {
+    if (containerData.getDataChecksum() == oldDataChecksum) {
+      LOG.info(getMessage(containerData, "Container reconciled with peer " + 
peer.toString() +
+          ". No change in checksum."));
+    } else {
+      LOG.warn(getMessage(containerData, "Container reconciled with peer " + 
peer.toString() +

Review Comment:
   Shouldn't it be INFO? WARN means something is wrong. 
   
   Not related to this, but we should use parameterized logging over String 
concatenation. 



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;

Review Comment:
   Define it where it is used.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();

Review Comment:
   nit: in general, I prefer Duration over direct time with units, but it is an 
unnecessary conversion to Duration and then to getting millis.
   May be use `long start = Instant.now().toEpochMilli();`.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data

Review Comment:
   nit:
   ```suggestion
         // Update checksum based on RocksDB metadata. The read chunk validates 
the checksum of the data
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);

Review Comment:
   qq: Why doesn't MerkleTree provide a function to add a block?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id is not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, TokenHelper 
tokenHelper,
+                                  Pipeline pipeline, 
DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. This block {} is 
expected to not exist. The container " +
+          "merkle tree for container {} is stale. Skipping reconciliation for 
block.", blockID,
+          containerData.getContainerID(), blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.

Review Comment:
   nit: A similar comment is in java-doc comment and at line 1665. I think 
java-doc comment is enough to tell when BcsId is getting updated.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id is not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, TokenHelper 
tokenHelper,
+                                  Pipeline pipeline, 
DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. This block {} is 
expected to not exist. The container " +
+          "merkle tree for container {} is stale. Skipping reconciliation for 
block.", blockID,
+          containerData.getContainerID(), blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Initializes the blockData and 
ChunkInputStream for each chunk

Review Comment:
   ```suggestion
         // Initializes the blockData and ChunkInputStream for each chunk.
   ```
   Comment `BlockInputStream#initialize() should be called before this, as it 
gets the BlockData for the block. and sets the block length.` is partially 
redundant. Maybe merge these two comments and tell "why you are doing this." 
rather than " what this is doing.".



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -98,11 +100,13 @@ public void writeContainerDataTree(ContainerData data, 
ContainerMerkleTreeWriter
         checksumInfoBuilder = 
ContainerProtos.ContainerChecksumInfo.newBuilder();
       }

Review Comment:
   nit: Not related to this patch, but we should create a helper function, 
something like readOrCreate, to handle all the create scenarios for 
`checksumInfoBuilder`. And reuse it in `markBlocksAsDeleted`.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id is not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, TokenHelper 
tokenHelper,
+                                  Pipeline pipeline, 
DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. This block {} is 
expected to not exist. The container " +
+          "merkle tree for container {} is stale. Skipping reconciliation for 
block.", blockID,
+          containerData.getContainerID(), blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Initializes the blockData and 
ChunkInputStream for each chunk
+      blockInputStream.initialize();
+
+      // BlockInputStream#initialize() should be called before this, as it 
gets the BlockData for the block.
+      // and sets the block length.
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // The maxBcsId is the peer's bcsId as there is no block for this 
blockID in the local container.
+      long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+      List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+      int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+          StorageUnit.BYTES);

Review Comment:
   Since `chunkSize` is used for each missing/corrupted block and chunk, maybe 
define it at the class level.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id is not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, TokenHelper 
tokenHelper,
+                                  Pipeline pipeline, 
DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. This block {} is 
expected to not exist. The container " +
+          "merkle tree for container {} is stale. Skipping reconciliation for 
block.", blockID,
+          containerData.getContainerID(), blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Initializes the blockData and 
ChunkInputStream for each chunk
+      blockInputStream.initialize();
+
+      // BlockInputStream#initialize() should be called before this, as it 
gets the BlockData for the block.
+      // and sets the block length.
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // The maxBcsId is the peer's bcsId as there is no block for this 
blockID in the local container.
+      long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+      List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+      int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+      // Don't update bcsId if chunk read fails
+      for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkInfoProto.getOffset());
+
+          // Read the chunk data from the BlockInputStream and write it to the 
container.
+          int chunkLength = (int) chunkInfoProto.getLen();
+          if (chunkByteBuffer.capacity() < chunkLength) {
+            chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+          }
+
+          chunkByteBuffer.clear();
+          chunkByteBuffer.limit(chunkLength);
+          int bytesRead = blockInputStream.read(chunkByteBuffer);
+          if (bytesRead != chunkLength) {
+            throw new IOException("Error while reading chunk data from block 
input stream. Expected length: " +
+                chunkLength + ", Actual length: " + bytesRead);
+          }
+
+          chunkByteBuffer.flip();
+          ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+          writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+          successfulChunksList.add(chunkInfoProto);
+        } catch (IOException ex) {
+          overwriteBcsId = false;
+          LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+              blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+        }
+      }
+
+      BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+      putBlockData.setChunks(successfulChunksList);
+      putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+    }
+  }
+
+  /**
+   * This method reconciles chunks per block. It reads the missing/corrupt 
chunk data from the peer
+   * datanode and writes it to the local container. If the chunk write fails, 
the block commit sequence
+   * id is not updated.
+   */
+  private void reconcileChunksPerBlock(KeyValueContainer container, 
TokenHelper tokenHelper, Pipeline pipeline,
+                                       DNContainerOperationClient dnClient, 
long blockId,
+                                       List<ContainerProtos.ChunkMerkleTree> 
chunkList) throws IOException {
+
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+
+    SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap = 
localBlockData.getChunks().stream()
+        .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+            Function.identity(), (chunk1, chunk2) -> chunk1, TreeMap::new));
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Initializes the blockData and 
ChunkInputStream for each chunk
+      blockInputStream.initialize();
+
+      // BlockInputStream#initialize() should be called before this, as it 
gets the BlockData for the block
+      // and sets the block length.
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+      long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+          localBlockData.getBlockCommitSequenceId());
+
+      int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+      for (ContainerProtos.ChunkMerkleTree chunkMerkleTree : chunkList) {
+        long chunkOffset = chunkMerkleTree.getOffset();
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkOffset);
+          ChunkInputStream currentChunkStream = 
blockInputStream.getChunkStreams().get(
+              blockInputStream.getChunkIndex());
+          ContainerProtos.ChunkInfo chunkInfoProto = 
currentChunkStream.getChunkInfo();
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+
+          // Verify the chunk offset and length.

Review Comment:
   nit: please remove this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to