errose28 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r2029008941
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
Review Comment:
It is possible that we find a block with no chunks. I added a method to add
an empty block in #7490
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]