aswinshakil commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r2000594357


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+    long dataChecksum = 0;
+    ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+    if (optionalChecksumInfo.isPresent()) {
+      checksumInfo = optionalChecksumInfo.get();
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      checksumInfo = createContainerMerkleTree(container);
+      if (checksumInfo == null) {
+        LOG.error("Failed to reconcile container {} as checksum info is not 
available",
+            containerData.getContainerID());
+        return;
+      }
+      oldDataChecksum = 
checksumInfo.getContainerMerkleTree().getDataChecksum();
+    }
+
+    for (DatanodeDetails peer : peers) {
+      Instant start = Instant.now();
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          try {
+            handleMissingBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, missingBlock);
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+                containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          try {
+            reconcileChunksPerBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(),
+                entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          try {
+            reconcileChunksPerBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(),
+                entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+        // Update checksum based on RocksDB metadata
+        ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = 
updateContainerChecksum(containerData);
+        dataChecksum = 
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+        long duration = Duration.between(start, Instant.now()).toMillis();
+        if (dataChecksum == oldDataChecksum) {
+          metrics.incContainerReconciledWithoutChanges();
+          LOG.info("Container {} reconciled without changes, Current checksum 
{}. Time taken {} ms",
+              containerData.getContainerID(), checksumToString(dataChecksum), 
duration);
+        } else {
+          metrics.incContainerReconciledWithChanges();
+          LOG.warn("Container {} reconciled, Checksum updated from {} to {}. 
Time taken {} ms",
+              containerData.getContainerID(), 
checksumToString(oldDataChecksum),
+              checksumToString(dataChecksum), duration);
+        }
+        ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum, peer);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
     sendICR(container);
   }
 
+  // Return the entire tree instead of just the checksum
+  private ContainerProtos.ContainerChecksumInfo 
updateContainerChecksum(KeyValueContainerData containerData)
+      throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+        .writeContainerDataTree(containerData, merkleTree);
+    
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+    return checksumInfo;
+  }
+
+  /**
+   * Handle missing block. It reads the missing block data from the peer 
datanode and writes it to the local container.
+   * If the block write fails, the block commit sequence id is not updated.
+   */
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  XceiverClientSpi xceiverClient, 
ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    // TODO: Re-use the blockResponse for the same block again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+    long bcsId = getBlockManager().blockExists(container, blockID) ?
+        getBlockManager().getBlock(container, 
blockID).getBlockCommitSequenceId() : 0;
+    // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+    long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+    List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+    List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    // Don't update bcsId if chunk read fails
+    for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+      try {
+        ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, 
blockID, blockToken);
+        ChunkBuffer chunkBuffer = 
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+        chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+        writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+        successfullChunksList.add(chunkInfoProto);
+      } catch (IOException ex) {
+        overwriteBcsId = false;
+        LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+                blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+      }
+    }
+
+    BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+    putBlockData.setChunks(successfullChunksList);
+    putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+  }
+
+  private ByteString readChunkData(XceiverClientSpi xceiverClient, 
ContainerProtos.ChunkInfo chunkInfoProto,
+                                   BlockID blockID, 
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+    ContainerProtos.ReadChunkResponseProto response =
+        ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, 
blockID.getDatanodeBlockIDProtobuf(),
+            null, blockToken);
+
+    if (response.hasData()) {
+      return response.getData();
+    } else if (response.hasDataBuffers()) {
+      return 
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+    } else {
+      throw new IOException("Error reading chunk data: No data returned.");
+    }
+  }
+
+  /**
+   * This method reconciles chunks per block. It reads the missing/corrupt 
chunk data from the peer
+   * datanode and writes it to the local container. If the chunk write fails, 
the block commit sequence
+   * id is not updated.
+   */
+  private void reconcileChunksPerBlock(KeyValueContainer container, 
ContainerData containerData,
+                                       TokenHelper tokenHelper, 
XceiverClientSpi xceiverClient, long blockId,
+                                       List<ContainerProtos.ChunkMerkleTree> 
chunkList) throws IOException {
+    Set<Long> offsets = 
chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset)
+        .collect(Collectors.toSet());
+    BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+    // The length of the block is not known, so instead of passing the default 
block length we pass 0. As the length
+    // is not used to validate the token for getBlock call.
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+    BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+    // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+    long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+            localBlockData.getBlockCommitSequenceId());
+    List<ContainerProtos.ChunkInfo> chunksListFromPeer = 
peerBlockData.getChunksList();
+
+    SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap = 
localBlockData.getChunks().stream()
+            .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+                Function.identity(), (chunk1, chunk2) -> chunk1, 
TreeMap::new));
+    boolean overwriteBcsId = true;
+
+    for (ContainerProtos.ChunkInfo chunkInfoProto : chunksListFromPeer) {

Review Comment:
   I agree but if we had the chunkInfos in the `chunkList` that might be 
possible, Our only source of truth is `chunksListFromPeer` which has the 
chunkInfo along with checksum data. So even if we created a temp `chunkInfo` to 
just read the chunk. We need to validate the read data with chunk checksum, So 
we'd have to iterate through the `chunksListFromPeer` anyway. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to