errose28 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r2029332825


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id is not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, TokenHelper 
tokenHelper,
+                                  Pipeline pipeline, 
DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. This block {} is 
expected to not exist. The container " +
+          "merkle tree for container {} is stale. Skipping reconciliation for 
block.", blockID,
+          containerData.getContainerID(), blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Initializes the blockData and 
ChunkInputStream for each chunk
+      blockInputStream.initialize();
+
+      // BlockInputStream#initialize() should be called before this, as it 
gets the BlockData for the block.
+      // and sets the block length.
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // The maxBcsId is the peer's bcsId as there is no block for this 
blockID in the local container.
+      long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+      List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+      int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+      // Don't update bcsId if chunk read fails
+      for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkInfoProto.getOffset());
+
+          // Read the chunk data from the BlockInputStream and write it to the 
container.
+          int chunkLength = (int) chunkInfoProto.getLen();
+          if (chunkByteBuffer.capacity() < chunkLength) {
+            chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+          }
+
+          chunkByteBuffer.clear();
+          chunkByteBuffer.limit(chunkLength);
+          int bytesRead = blockInputStream.read(chunkByteBuffer);
+          if (bytesRead != chunkLength) {
+            throw new IOException("Error while reading chunk data from block 
input stream. Expected length: " +
+                chunkLength + ", Actual length: " + bytesRead);
+          }
+
+          chunkByteBuffer.flip();
+          ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+          writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+          successfulChunksList.add(chunkInfoProto);
+        } catch (IOException ex) {
+          overwriteBcsId = false;
+          LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+              blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);

Review Comment:
   Latest code comment explains this.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1505,295 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      checksumInfo = updateAndGetContainerChecksum(containerData);
+    }
+    oldDataChecksum = checksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    for (DatanodeDetails peer : peers) {
+      long start = Instant.now().toEpochMilli();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      Pipeline pipeline = createSingleNodePipeline(peer);
+      ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, pipeline, dnClient, missingBlock, 
chunkByteBuffer);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, pipeline, dnClient, 
entry.getKey(), entry.getValue(), chunkByteBuffer);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, pipeline, dnClient, 
entry.getKey(), entry.getValue(), chunkByteBuffer);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+              containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata. The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateAndGetContainerChecksum(containerData);
+      long dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Instant.now().toEpochMilli() - start;
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateAndGetContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        // TODO: Add empty blocks to the merkle tree. Done in HDDS-10374, 
needs to be backported.
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    // TODO: Remove this as this is being set in writeContainerDataTree by 
HDDS-12745
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id of the container 
and the block are not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, Pipeline 
pipeline, DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree 
missingBlock, ByteBuffer chunkByteBuffer)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
dnClient.getTokenHelper().getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. The block should not 
exist and our container merkle tree" +
+              " is stale. Skipping reconciliation for this block.", blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Gets the blockData from the peer, 
sets the block length and
+      // initializes ChunkInputStream for each chunk.
+      blockInputStream.initialize();
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // The maxBcsId is the peer's bcsId as there is no block for this 
blockID in the local container.
+      long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+      List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+
+      // Don't update bcsId if chunk read fails
+      for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkInfoProto.getOffset());
+
+          // Read the chunk data from the BlockInputStream and write it to the 
container.
+          int chunkLength = (int) chunkInfoProto.getLen();
+          if (chunkByteBuffer.capacity() < chunkLength) {
+            chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+          }
+
+          chunkByteBuffer.clear();
+          chunkByteBuffer.limit(chunkLength);
+          int bytesRead = blockInputStream.read(chunkByteBuffer);
+          if (bytesRead != chunkLength) {
+            throw new IOException("Error while reading chunk data from block 
input stream. Expected length: " +
+                chunkLength + ", Actual length: " + bytesRead);
+          }
+
+          chunkByteBuffer.flip();
+          ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+          writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+          // If the chunk read/write fails, we are expected to have holes in 
the blockData's chunk list.
+          // But that is okay, if the read fails it means there might be a 
hole in the peer datanode as well.
+          // If the chunk write fails then we don't want to add the metadata 
without the actual data as there is
+          // no data to verify the chunk checksum.
+          successfulChunksList.add(chunkInfoProto);
+        } catch (IOException ex) {
+          overwriteBcsId = false;
+          LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+              blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+        }
+      }
+
+      BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+      putBlockData.setChunks(successfulChunksList);
+      putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+    }
+  }
+
+  /**
+   * This method reconciles chunks per block. It reads the missing/corrupt 
chunk data from the peer
+   * datanode and writes it to the local container. If the chunk write fails, 
the block commit sequence
+   * id is not updated.
+   */
+  private void reconcileChunksPerBlock(KeyValueContainer container, Pipeline 
pipeline,
+                                       DNContainerOperationClient dnClient, 
long blockId,
+                                       List<ContainerProtos.ChunkMerkleTree> 
chunkList, ByteBuffer chunkByteBuffer)
+      throws IOException {
+
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
dnClient.getTokenHelper().getBlockToken(blockID, 0L);
+    BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+
+    SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap = 
localBlockData.getChunks().stream()
+        .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+            Function.identity(), (chunk1, chunk2) -> chunk1, TreeMap::new));
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Gets the blockData from the peer, 
sets the block length and
+      // initializes ChunkInputStream for each chunk.
+      blockInputStream.initialize();
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+      long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+          localBlockData.getBlockCommitSequenceId());
+
+      for (ContainerProtos.ChunkMerkleTree chunkMerkleTree : chunkList) {
+        long chunkOffset = chunkMerkleTree.getOffset();
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkOffset);
+          ChunkInputStream currentChunkStream = 
blockInputStream.getChunkStreams().get(
+              blockInputStream.getChunkIndex());
+          ContainerProtos.ChunkInfo chunkInfoProto = 
currentChunkStream.getChunkInfo();
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+          verifyChunksLength(chunkInfoProto, localChunksMap.get(chunkOffset));
+
+          // Read the chunk data from the BlockInputStream and write it to the 
container.
+          int chunkLength = (int) chunkInfoProto.getLen();
+          if (chunkByteBuffer.capacity() < chunkLength) {
+            chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+          }
+
+          chunkByteBuffer.clear();
+          chunkByteBuffer.limit(chunkLength);
+          int bytesRead = blockInputStream.read(chunkByteBuffer);
+          if (bytesRead != chunkLength) {
+            throw new IOException("Error while reading chunk data from block 
input stream. Expected length: " +
+                chunkLength + ", Actual length: " + bytesRead);
+          }
+
+          chunkByteBuffer.flip();
+          ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+          writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);

Review Comment:
   We need a comment here also explaining that if we are missing a few chunks 
at the end and one update fails, we may get holes in the block.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container, 
boolean force)
     deleteInternal(container, force);
   }
 
+  // Update Java Doc steps
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+
+    if (checksumInfo.isPresent()) {
+      oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      createContainerMerkleTree(container);
+      checksumInfo = checksumManager.read(containerData);
+      if (checksumInfo.isPresent()) {
+        oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+      }
+    }
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      // Check block token usage. How it is used in DN
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          try {
+            handleMissingBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, missingBlock);
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+                containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+

Review Comment:
   > If the diff returns null, we should match container BCSIDs to the max of 
two.
   
   Let's look into this in a follow-up task. There is a case where we update 
all the data but the BCSID update fails afterwards. To get the BCSID we would 
need to make a readContainer call to the peer since we don't want to put that 
in our merkle tree.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      optionalChecksumInfo = createContainerMerkleTree(container);
+      if (!optionalChecksumInfo.isPresent()) {
+        throw new StorageContainerException("Failed to reconcile container " + 
containerData.getContainerID()
+            + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+      }
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      Pipeline pipeline = createSingleNodePipeline(peer);
+
+      // Handle missing blocks
+      for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+        try {
+          handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient, 
missingBlock);
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+              containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle missing chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+
+      // Handle corrupt chunks
+      for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+        try {
+          reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline, 
dnClient, entry.getKey(),
+              entry.getValue());
+        } catch (IOException e) {
+          LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                  containerData.getContainerID(), e);
+        }
+      }
+      // Update checksum based on RocksDB metadata, The read chunk validates 
the checksum of the data
+      // we read. So we can update the checksum only based on the RocksDB 
metadata.
+      ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+      dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+      long duration = Duration.between(start, Instant.now()).toMillis();
+      if (dataChecksum == oldDataChecksum) {
+        metrics.incContainerReconciledWithoutChanges();
+        LOG.info("Container {} reconciled with peer {}. No change in checksum. 
Current checksum {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(dataChecksum), duration);
+      } else {
+        metrics.incContainerReconciledWithChanges();
+        LOG.warn("Container {} reconciled with peer {}. Checksum updated from 
{} to {}. Time taken {} ms",
+            containerData.getContainerID(), peer.toString(), 
checksumToString(oldDataChecksum),
+            checksumToString(dataChecksum), duration);
+      }
+      ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  /**
+   * Updates the container merkle tree based on the RocksDb's block metadata 
and returns the updated checksum info.
+   * @param containerData - Container data for which the container merkle tree 
needs to be updated.
+   */
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id is not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, TokenHelper 
tokenHelper,
+                                  Pipeline pipeline, 
DNContainerOperationClient dnClient,
+                                  ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    ContainerData containerData = container.getContainerData();
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    if (getBlockManager().blockExists(container, blockID)) {
+      LOG.warn("Block {} already exists in container {}. This block {} is 
expected to not exist. The container " +
+          "merkle tree for container {} is stale. Skipping reconciliation for 
block.", blockID,
+          containerData.getContainerID(), blockID, 
containerData.getContainerID());
+      return;
+    }
+
+    List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+        .setBlockID(blockID)
+        .setPipeline(pipeline)
+        .setToken(blockToken)
+        .build();
+    // Under construction is set here, during BlockInputStream#initialize() it 
is used to update the block length.
+    blkInfo.setUnderConstruction(true);
+    try (BlockInputStream blockInputStream = (BlockInputStream) 
blockInputStreamFactory.create(
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+        null, conf.getObject(OzoneClientConfig.class))) {
+      // Initialize the BlockInputStream. Initializes the blockData and 
ChunkInputStream for each chunk
+      blockInputStream.initialize();
+
+      // BlockInputStream#initialize() should be called before this, as it 
gets the BlockData for the block.
+      // and sets the block length.
+      ContainerProtos.BlockData peerBlockData = 
blockInputStream.getStreamBlockData();
+      // The maxBcsId is the peer's bcsId as there is no block for this 
blockID in the local container.
+      long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+      List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+      int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+      // Don't update bcsId if chunk read fails
+      for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+        try {
+          // Seek to the offset of the chunk. Seek updates the chunkIndex in 
the BlockInputStream.
+          blockInputStream.seek(chunkInfoProto.getOffset());
+
+          // Read the chunk data from the BlockInputStream and write it to the 
container.
+          int chunkLength = (int) chunkInfoProto.getLen();
+          if (chunkByteBuffer.capacity() < chunkLength) {
+            chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+          }
+
+          chunkByteBuffer.clear();
+          chunkByteBuffer.limit(chunkLength);
+          int bytesRead = blockInputStream.read(chunkByteBuffer);
+          if (bytesRead != chunkLength) {
+            throw new IOException("Error while reading chunk data from block 
input stream. Expected length: " +
+                chunkLength + ", Actual length: " + bytesRead);
+          }
+
+          chunkByteBuffer.flip();
+          ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+          ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+          chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+          writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+          successfulChunksList.add(chunkInfoProto);
+        } catch (IOException ex) {
+          overwriteBcsId = false;
+          LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+              blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+        }
+      }
+
+      BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+      putBlockData.setChunks(successfulChunksList);
+      putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);

Review Comment:
   Also covered by comment in latest version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to