aswinshakil commented on code in PR #7474: URL: https://github.com/apache/ozone/pull/7474#discussion_r1929376150
########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java: ########## @@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, boolean force) @Override public void reconcileContainer(DNContainerOperationClient dnClient, Container<?> container, Set<DatanodeDetails> peers) throws IOException { - // TODO Just a deterministic placeholder hash for testing until actual implementation is finished. - ContainerData data = container.getContainerData(); - long id = data.getContainerID(); - ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES) - .putLong(id) - .asReadOnlyBuffer(); - byteBuffer.rewind(); - ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl(); - checksumImpl.update(byteBuffer); - long dataChecksum = checksumImpl.getValue(); - LOG.info("Generated data checksum of container {} for testing: {}", id, dataChecksum); - data.setDataChecksum(dataChecksum); + KeyValueContainer kvContainer = (KeyValueContainer) container; + KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + + for (DatanodeDetails peer : peers) { + ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( + containerData.getContainerID(), peer); + if (peerChecksumInfo == null) { + LOG.warn("Checksum not yet generated for peer: {}", peer); + return; + } + + long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, + StorageUnit.BYTES); + ContainerDiffReport diffReport = checksumManager.diff(containerData, peerChecksumInfo); + TokenHelper tokenHelper = dnClient.getTokenHelper(); + XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager() + .acquireClient(createSingleNodePipeline(peer)); + + try { + // Handle missing blocks + for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { + handleMissingBlock(kvContainer, containerData, tokenHelper, scmBlockSize, xceiverClient, missingBlock); + } + + // Handle missing chunks + for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getMissingChunks().entrySet()) { + reconcileChunk(kvContainer, containerData, tokenHelper, scmBlockSize, xceiverClient, entry); + } + + // Handle corrupt chunks + for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getCorruptChunks().entrySet()) { + reconcileChunk(kvContainer, containerData, tokenHelper, scmBlockSize, xceiverClient, entry); + } + updateContainerChecksum(containerData); + } finally { + dnClient.getXceiverClientManager().releaseClient(xceiverClient, false); + } + } + + long dataChecksum = updateContainerChecksum(containerData); + LOG.info("Checksum data for container {} is updated to {}", containerData.getContainerID(), dataChecksum); + containerData.setDataChecksum(dataChecksum); sendICR(container); } + private long updateContainerChecksum(KeyValueContainerData containerData) throws IOException { + ContainerMerkleTree merkleTree = new ContainerMerkleTree(); + try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf); + BlockIterator<BlockData> blockIterator = dbHandle.getStore(). + getBlockIterator(containerData.getContainerID())) { + while (blockIterator.hasNext()) { + BlockData blockData = blockIterator.nextBlock(); + List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks(); + merkleTree.addChunks(blockData.getLocalID(), chunkInfos); + } + } + checksumManager.writeContainerDataTree(containerData, merkleTree); + return merkleTree.toProto().getDataChecksum(); + } + + private void handleMissingBlock(KeyValueContainer container, ContainerData containerData, TokenHelper tokenHelper, + long scmBlockSize, XceiverClientSpi xceiverClient, + ContainerProtos.BlockMerkleTree missingBlock) throws IOException { + BlockID blockID = new BlockID(containerData.getContainerID(), missingBlock.getBlockID()); + Token<OzoneBlockTokenIdentifier> blockToken = tokenHelper.getBlockToken(blockID, scmBlockSize); + // TODO: Cache the blockResponse to reuse it again. + ContainerProtos.GetBlockResponseProto blockResponse = ContainerProtocolCalls.getBlock(xceiverClient, blockID, + blockToken, new HashMap<>()); + // TODO: Add BcsId in BlockMerkleTree to avoid this call + ContainerProtos.GetCommittedBlockLengthResponseProto blockLengthResponse = + ContainerProtocolCalls.getCommittedBlockLength(xceiverClient, blockID, blockToken); + List<ContainerProtos.ChunkInfo> chunksList = blockResponse.getBlockData().getChunksList(); + + for (ContainerProtos.ChunkInfo chunkInfoProto : chunksList) { + ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, blockID, blockToken); + ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer()); + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); + chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); + writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container); + } + + putBlockForClosedContainer(chunksList, container, BlockData.getFromProtoBuf(blockResponse.getBlockData()), + blockLengthResponse.getBlockLength()); + } + + private ByteString readChunkData(XceiverClientSpi xceiverClient, ContainerProtos.ChunkInfo chunkInfoProto, + BlockID blockID, Token<OzoneBlockTokenIdentifier> blockToken) throws IOException { + ContainerProtos.ReadChunkResponseProto response = + ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, blockID.getDatanodeBlockIDProtobuf(), + null, blockToken); + + if (response.hasData()) { + return response.getData(); + } else if (response.hasDataBuffers()) { + return BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList()); + } else { + throw new IOException("Error reading chunk data: No data returned."); + } + } + + private void reconcileChunk(KeyValueContainer container, ContainerData containerData, TokenHelper tokenHelper, + long scmBlockSize, XceiverClientSpi xceiverClient, + Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> mapEntry) throws IOException { + long blockId = mapEntry.getKey(); + List<ContainerProtos.ChunkMerkleTree> chunkList = mapEntry.getValue(); + Set<Long> offsets = chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset) + .collect(Collectors.toSet()); + BlockID blockID = new BlockID(containerData.getContainerID(), blockId); + Token<OzoneBlockTokenIdentifier> blockToken = tokenHelper.getBlockToken(blockID, scmBlockSize); + ContainerProtos.GetBlockResponseProto blockResponse = ContainerProtocolCalls.getBlock(xceiverClient, blockID, + blockToken, new HashMap<>()); + // TODO: Add BcsId in BlockMerkleTree to avoid this call + ContainerProtos.GetCommittedBlockLengthResponseProto blockLengthResponse = + ContainerProtocolCalls.getCommittedBlockLength(xceiverClient, blockID, blockToken); + List<ContainerProtos.ChunkInfo> chunksList = blockResponse.getBlockData().getChunksList(); + + for (ContainerProtos.ChunkInfo chunkInfoProto : chunksList) { + if (offsets.contains(chunkInfoProto.getOffset())) { Review Comment: I'm not sure I get your point. The `chunksList` contains all the chunks from the block. We only need to overwrite corrupt or missing chunks, We need to filter it out by `offsets`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org