kerneltime commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r1965914659


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container, 
boolean force)
     deleteInternal(container, force);
   }
 
+  // Update Java Doc steps
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+
+    if (checksumInfo.isPresent()) {
+      oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      createContainerMerkleTree(container);
+      checksumInfo = checksumManager.read(containerData);
+      if (checksumInfo.isPresent()) {
+        oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+      }
+    }
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      // Check block token usage. How it is used in DN
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          try {
+            handleMissingBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, missingBlock);
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+                containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    // Update checksum based on RocksDB metadata
+    long dataChecksum = updateContainerChecksum(containerData);
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
+    if (dataChecksum == oldDataChecksum) {
+      metrics.incContainerReconciledWithoutChanges();
+      LOG.info("Container {} reconciled without changes, Current checksum {}", 
containerData.getContainerID(),
+              checksumToString(dataChecksum));
+    } else {
+      metrics.incContainerReconciledWithChanges();
+      LOG.warn("Container {} reconciled, Checksum updated from {} to {}", 
containerData.getContainerID(),
+              checksumToString(oldDataChecksum), 
checksumToString(dataChecksum));
+    }
+    ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    long dataChecksum = merkleTree.toProto().getDataChecksum();
+    containerData.setDataChecksum(dataChecksum);
+    return dataChecksum;
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  XceiverClientSpi xceiverClient, 
ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);

Review Comment:
   I would create a wrapper method which has the explanation for the zero length



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to