errose28 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r2027385708
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -146,33 +150,31 @@ public void markBlocksAsDeleted(KeyValueContainerData
data, Collection<Long> del
}
}
- public ContainerDiffReport diff(KeyValueContainerData thisContainer,
+ /**
+ * Compares the checksum info of the container with the peer's checksum info
and returns a report of the differences.
+ * @param thisChecksumInfo The checksum info of the container on this
datanode.
+ * @param peerChecksumInfo The checksum info of the container on the peer
datanode.
+ */
+ public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo
thisChecksumInfo,
ContainerProtos.ContainerChecksumInfo
peerChecksumInfo) throws
StorageContainerException {
ContainerDiffReport report = new ContainerDiffReport();
try {
captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
- Preconditions.assertNotNull(thisContainer, "Container data is null");
+ Preconditions.assertNotNull(thisChecksumInfo, "Our checksum info is
null");
Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is
null");
- Optional<ContainerProtos.ContainerChecksumInfo>
thisContainerChecksumInfo = read(thisContainer);
- if (!thisContainerChecksumInfo.isPresent()) {
- throw new StorageContainerException("The container #" +
thisContainer.getContainerID() +
- " doesn't have container checksum",
ContainerProtos.Result.IO_EXCEPTION);
- }
-
- if (thisContainer.getContainerID() !=
peerChecksumInfo.getContainerID()) {
+ if (thisChecksumInfo.getContainerID() !=
peerChecksumInfo.getContainerID()) {
throw new StorageContainerException("Container Id does not match for
container "
- + thisContainer.getContainerID(),
ContainerProtos.Result.CONTAINER_ID_MISMATCH);
+ + thisChecksumInfo.getContainerID(),
ContainerProtos.Result.CONTAINER_ID_MISMATCH);
Review Comment:
nit. This was present in the original too but now I'm noticing it: We should
put both our ID and the peer ID in the exception message.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java:
##########
@@ -354,4 +363,27 @@ public static void
writeContainerDataTreeProto(ContainerData data, ContainerProt
+ data.getContainerID(), ex);
}
}
+
+ /**
+ * Creates block metadata for the given container with the specified number
of blocks and chunks per block.
+ */
+ public static void createBlockMetaData(KeyValueContainerData data, int
numOfBlocksPerContainer,
+ int numOfChunksPerBlock) throws
IOException {
+ try (DBHandle metadata = BlockUtils.getDB(data, new OzoneConfiguration()))
{
+ for (int j = 0; j < numOfBlocksPerContainer; j++) {
+ BlockID blockID = new BlockID(data.getContainerID(), j);
+ String blockKey = data.getBlockKey(blockID.getLocalID());
+ BlockData kd = new BlockData(blockID);
+ List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+ for (int k = 0; k < numOfChunksPerBlock; k++) {
+ long dalaLen = 10L;
Review Comment:
nit. typo
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
Review Comment:
This won't be necessary after #8204. Whichever PR goes in first, the other
should resolve this on top of it.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, TokenHelper
tokenHelper,
+ Pipeline pipeline,
DNContainerOperationClient dnClient,
+ ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ if (getBlockManager().blockExists(container, blockID)) {
+ LOG.warn("Block {} already exists in container {}. This block {} is
expected to not exist. The container " +
+ "merkle tree for container {} is stale. Skipping reconciliation for
block.", blockID,
+ containerData.getContainerID(), blockID,
containerData.getContainerID());
+ return;
+ }
+
+ List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Initializes the blockData and
ChunkInputStream for each chunk
+ blockInputStream.initialize();
+
+ // BlockInputStream#initialize() should be called before this, as it
gets the BlockData for the block.
+ // and sets the block length.
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // The maxBcsId is the peer's bcsId as there is no block for this
blockID in the local container.
+ long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
Review Comment:
We should allocate this in `reconcileContainer` and pass it to the methods
that need it. We only ever read one chunk at a time and in the common case they
will all be the same length.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, TokenHelper
tokenHelper,
+ Pipeline pipeline,
DNContainerOperationClient dnClient,
+ ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ if (getBlockManager().blockExists(container, blockID)) {
+ LOG.warn("Block {} already exists in container {}. This block {} is
expected to not exist. The container " +
+ "merkle tree for container {} is stale. Skipping reconciliation for
block.", blockID,
+ containerData.getContainerID(), blockID,
containerData.getContainerID());
+ return;
+ }
+
+ List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Initializes the blockData and
ChunkInputStream for each chunk
+ blockInputStream.initialize();
+
+ // BlockInputStream#initialize() should be called before this, as it
gets the BlockData for the block.
+ // and sets the block length.
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // The maxBcsId is the peer's bcsId as there is no block for this
blockID in the local container.
+ long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ // Seek to the offset of the chunk. Seek updates the chunkIndex in
the BlockInputStream.
+ blockInputStream.seek(chunkInfoProto.getOffset());
+
+ // Read the chunk data from the BlockInputStream and write it to the
container.
+ int chunkLength = (int) chunkInfoProto.getLen();
+ if (chunkByteBuffer.capacity() < chunkLength) {
+ chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+ }
+
+ chunkByteBuffer.clear();
+ chunkByteBuffer.limit(chunkLength);
+ int bytesRead = blockInputStream.read(chunkByteBuffer);
+ if (bytesRead != chunkLength) {
+ throw new IOException("Error while reading chunk data from block
input stream. Expected length: " +
+ chunkLength + ", Actual length: " + bytesRead);
+ }
+
+ chunkByteBuffer.flip();
+ ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfulChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfulChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+ }
+
+ /**
+ * This method reconciles chunks per block. It reads the missing/corrupt
chunk data from the peer
+ * datanode and writes it to the local container. If the chunk write fails,
the block commit sequence
+ * id is not updated.
+ */
+ private void reconcileChunksPerBlock(KeyValueContainer container,
TokenHelper tokenHelper, Pipeline pipeline,
+ DNContainerOperationClient dnClient,
long blockId,
+ List<ContainerProtos.ChunkMerkleTree>
chunkList) throws IOException {
+
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+
+ SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap =
localBlockData.getChunks().stream()
+ .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+ Function.identity(), (chunk1, chunk2) -> chunk1, TreeMap::new));
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Initializes the blockData and
ChunkInputStream for each chunk
+ blockInputStream.initialize();
+
+ // BlockInputStream#initialize() should be called before this, as it
gets the BlockData for the block
+ // and sets the block length.
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+ localBlockData.getBlockCommitSequenceId());
+
+ int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+ for (ContainerProtos.ChunkMerkleTree chunkMerkleTree : chunkList) {
+ long chunkOffset = chunkMerkleTree.getOffset();
+ try {
+ // Seek to the offset of the chunk. Seek updates the chunkIndex in
the BlockInputStream.
+ blockInputStream.seek(chunkOffset);
+ ChunkInputStream currentChunkStream =
blockInputStream.getChunkStreams().get(
+ blockInputStream.getChunkIndex());
+ ContainerProtos.ChunkInfo chunkInfoProto =
currentChunkStream.getChunkInfo();
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+
+ // Verify the chunk offset and length.
+ verifyChunksLength(chunkInfoProto, localChunksMap.get(chunkOffset));
+
+ // Read the chunk data from the BlockInputStream and write it to the
container.
+ int chunkLength = (int) chunkInfoProto.getLen();
+ if (chunkByteBuffer.capacity() < chunkLength) {
+ chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+ }
+
+ chunkByteBuffer.clear();
+ chunkByteBuffer.limit(chunkLength);
+ int bytesRead = blockInputStream.read(chunkByteBuffer);
+ if (bytesRead != chunkLength) {
+ throw new IOException("Error while reading chunk data from block
input stream. Expected length: " +
+ chunkLength + ", Actual length: " + bytesRead);
+ }
+
+ chunkByteBuffer.flip();
+ ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ localChunksMap.put(chunkInfo.getOffset(), chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling chunk {} for block {} in
container {}",
+ chunkOffset, blockID, containerData.getContainerID(), ex);
Review Comment:
Same comment as the missing block path: once one chunk write fails we cannot
continue to write other chunks. In this case we should try to do putBlock with
whatever we currently have.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java:
##########
@@ -122,6 +131,11 @@ public void
testReconcileContainerCommandReports(ContainerLayoutVersion layout)
init(layout, icrSender);
for (int id = 1; id <= NUM_CONTAINERS; id++) {
+ KeyValueContainerData data = new KeyValueContainerData(id, layout, GB,
+ PipelineID.randomId().toString(),
randomDatanodeDetails().getUuidString());
+ data.setMetadataPath(tempDir.toString());
+ data.setDbFile(dbFile.toFile());
+ createBlockMetaData(data, 5, 3);
Review Comment:
Init already created the containers, we don't need to do it again here. The
test still passes if we move this line 138 to line 105 and have no changes to
this method.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java:
##########
@@ -65,7 +65,7 @@ public DNContainerOperationClient(ConfigurationSource conf,
}
@Nonnull
- private static XceiverClientManager createClientManager(
+ public static XceiverClientManager createClientManager(
Review Comment:
This doesn't need to be public. It's still only called internally.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, TokenHelper
tokenHelper,
+ Pipeline pipeline,
DNContainerOperationClient dnClient,
+ ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ if (getBlockManager().blockExists(container, blockID)) {
+ LOG.warn("Block {} already exists in container {}. This block {} is
expected to not exist. The container " +
+ "merkle tree for container {} is stale. Skipping reconciliation for
block.", blockID,
+ containerData.getContainerID(), blockID,
containerData.getContainerID());
+ return;
+ }
+
+ List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Initializes the blockData and
ChunkInputStream for each chunk
+ blockInputStream.initialize();
+
+ // BlockInputStream#initialize() should be called before this, as it
gets the BlockData for the block.
+ // and sets the block length.
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // The maxBcsId is the peer's bcsId as there is no block for this
blockID in the local container.
+ long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ // Seek to the offset of the chunk. Seek updates the chunkIndex in
the BlockInputStream.
+ blockInputStream.seek(chunkInfoProto.getOffset());
+
+ // Read the chunk data from the BlockInputStream and write it to the
container.
+ int chunkLength = (int) chunkInfoProto.getLen();
+ if (chunkByteBuffer.capacity() < chunkLength) {
+ chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+ }
+
+ chunkByteBuffer.clear();
+ chunkByteBuffer.limit(chunkLength);
+ int bytesRead = blockInputStream.read(chunkByteBuffer);
+ if (bytesRead != chunkLength) {
+ throw new IOException("Error while reading chunk data from block
input stream. Expected length: " +
Review Comment:
We should add container ID, block ID, and chunk offset to this message.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -600,28 +627,19 @@ ContainerCommandResponseProto handleCloseContainer(
return getSuccessResponse(request);
}
- private void createContainerMerkleTree(Container container) {
+ public Optional<ContainerProtos.ContainerChecksumInfo>
createContainerMerkleTree(Container container) {
Review Comment:
Right now this just needs to be public for the tests, right? And when we
have scanner integration it can be private again? If so, let's add both
`@VisibleForTesting` and a `TODO` comment referencing HDDS-10374.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java:
##########
@@ -354,4 +363,27 @@ public static void
writeContainerDataTreeProto(ContainerData data, ContainerProt
+ data.getContainerID(), ex);
}
}
+
+ /**
+ * Creates block metadata for the given container with the specified number
of blocks and chunks per block.
+ */
+ public static void createBlockMetaData(KeyValueContainerData data, int
numOfBlocksPerContainer,
Review Comment:
We can probably move this somewhere more generic, like `ContainerTestUtils`.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, TokenHelper
tokenHelper,
Review Comment:
The `TokenHelper` comes from the client which is already passed in to this
method and the chunk reconcile method. We can remove this parameter and have
the methods just get it from the client.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -600,28 +627,19 @@ ContainerCommandResponseProto handleCloseContainer(
return getSuccessResponse(request);
}
- private void createContainerMerkleTree(Container container) {
+ public Optional<ContainerProtos.ContainerChecksumInfo>
createContainerMerkleTree(Container container) {
if (ContainerChecksumTreeManager.checksumFileExist(container)) {
- return;
+ return Optional.empty();
Review Comment:
My intent with the previous comment was to return empty optional only when
the tree creation fails, so we don't need to do try/catch/ignore on all the
container close paths. Looking at this again though, I think we can simplify
this because reconcileContainer is propagating the exception and checking
existence already so it doesn't need the wrapper.
```diff
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index a324134ccb..92c59dffdc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -627,19 +627,18 @@ ContainerCommandResponseProto handleCloseContainer(
return getSuccessResponse(request);
}
- public Optional<ContainerProtos.ContainerChecksumInfo>
createContainerMerkleTree(Container container) {
+ public void createContainerMerkleTree(Container container) {
if (ContainerChecksumTreeManager.checksumFileExist(container)) {
- return Optional.empty();
+ return;
}
try {
KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
- return Optional.of(updateContainerChecksum(containerData));
+ updateContainerChecksum(containerData);
} catch (IOException ex) {
LOG.error("Cannot create container checksum for container {} ,
Exception: ",
container.getContainerData().getContainerID(), ex);
}
- return Optional.empty();
}
/**
@@ -1513,12 +1512,7 @@ public void
reconcileContainer(DNContainerOperationClient dnClient, Container<?>
oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
} else {
// Try creating the checksum info from RocksDB metadata if it is not
present.
- optionalChecksumInfo = createContainerMerkleTree(container);
- if (!optionalChecksumInfo.isPresent()) {
- throw new StorageContainerException("Failed to reconcile container
" + containerData.getContainerID()
- + " as checksum info is not available",
CONTAINER_CHECKSUM_ERROR);
- }
- checksumInfo = optionalChecksumInfo.get();
+ checksumInfo = updateContainerChecksum(containerData);
oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
}
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
Review Comment:
```suggestion
* If the block write fails, the block commit sequence id of the container
and the block is not updated.
```
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java:
##########
@@ -727,7 +727,7 @@ String getChunkName() {
return chunkInfo.getChunkName();
}
- protected long getLength() {
+ public long getLength() {
Review Comment:
Is this modifier change needed? I don't see it being used in the new code.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, TokenHelper
tokenHelper,
+ Pipeline pipeline,
DNContainerOperationClient dnClient,
+ ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ if (getBlockManager().blockExists(container, blockID)) {
+ LOG.warn("Block {} already exists in container {}. This block {} is
expected to not exist. The container " +
+ "merkle tree for container {} is stale. Skipping reconciliation for
block.", blockID,
+ containerData.getContainerID(), blockID,
containerData.getContainerID());
Review Comment:
Removing duplicate info from the log. Block IDs can be long so logging the
same one twice is harder to read.
```suggestion
LOG.warn("Block {} already exists in container {}. The block should
not exist and our container merkle tree is stale. Skipping reconciliation for
this block.", blockID);
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, TokenHelper
tokenHelper,
+ Pipeline pipeline,
DNContainerOperationClient dnClient,
+ ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ if (getBlockManager().blockExists(container, blockID)) {
+ LOG.warn("Block {} already exists in container {}. This block {} is
expected to not exist. The container " +
+ "merkle tree for container {} is stale. Skipping reconciliation for
block.", blockID,
+ containerData.getContainerID(), blockID,
containerData.getContainerID());
+ return;
+ }
+
+ List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Initializes the blockData and
ChunkInputStream for each chunk
+ blockInputStream.initialize();
+
+ // BlockInputStream#initialize() should be called before this, as it
gets the BlockData for the block.
+ // and sets the block length.
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // The maxBcsId is the peer's bcsId as there is no block for this
blockID in the local container.
+ long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ // Seek to the offset of the chunk. Seek updates the chunkIndex in
the BlockInputStream.
+ blockInputStream.seek(chunkInfoProto.getOffset());
+
+ // Read the chunk data from the BlockInputStream and write it to the
container.
+ int chunkLength = (int) chunkInfoProto.getLen();
+ if (chunkByteBuffer.capacity() < chunkLength) {
+ chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+ }
+
+ chunkByteBuffer.clear();
+ chunkByteBuffer.limit(chunkLength);
+ int bytesRead = blockInputStream.read(chunkByteBuffer);
+ if (bytesRead != chunkLength) {
+ throw new IOException("Error while reading chunk data from block
input stream. Expected length: " +
+ chunkLength + ", Actual length: " + bytesRead);
+ }
+
+ chunkByteBuffer.flip();
+ ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfulChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfulChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
Review Comment:
Can't we just write the peer's block data directly as our block data?
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1501,313 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ optionalChecksumInfo = createContainerMerkleTree(container);
+ if (!optionalChecksumInfo.isPresent()) {
+ throw new StorageContainerException("Failed to reconcile container " +
containerData.getContainerID()
+ + " as checksum info is not available", CONTAINER_CHECKSUM_ERROR);
+ }
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ Pipeline pipeline = createSingleNodePipeline(peer);
+
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, tokenHelper, pipeline, dnClient,
missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, tokenHelper, pipeline,
dnClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata, The read chunk validates
the checksum of the data
+ // we read. So we can update the checksum only based on the RocksDB
metadata.
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled with peer {}. No change in checksum.
Current checksum {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(dataChecksum), duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled with peer {}. Checksum updated from
{} to {}. Time taken {} ms",
+ containerData.getContainerID(), peer.toString(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ /**
+ * Updates the container merkle tree based on the RocksDb's block metadata
and returns the updated checksum info.
+ * @param containerData - Container data for which the container merkle tree
needs to be updated.
+ */
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, TokenHelper
tokenHelper,
+ Pipeline pipeline,
DNContainerOperationClient dnClient,
+ ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ ContainerData containerData = container.getContainerData();
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ if (getBlockManager().blockExists(container, blockID)) {
+ LOG.warn("Block {} already exists in container {}. This block {} is
expected to not exist. The container " +
+ "merkle tree for container {} is stale. Skipping reconciliation for
block.", blockID,
+ containerData.getContainerID(), blockID,
containerData.getContainerID());
+ return;
+ }
+
+ List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
+ .setBlockID(blockID)
+ .setPipeline(pipeline)
+ .setToken(blockToken)
+ .build();
+ // Under construction is set here, during BlockInputStream#initialize() it
is used to update the block length.
+ blkInfo.setUnderConstruction(true);
+ try (BlockInputStream blockInputStream = (BlockInputStream)
blockInputStreamFactory.create(
+ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(),
+ null, conf.getObject(OzoneClientConfig.class))) {
+ // Initialize the BlockInputStream. Initializes the blockData and
ChunkInputStream for each chunk
+ blockInputStream.initialize();
+
+ // BlockInputStream#initialize() should be called before this, as it
gets the BlockData for the block.
+ // and sets the block length.
+ ContainerProtos.BlockData peerBlockData =
blockInputStream.getStreamBlockData();
+ // The maxBcsId is the peer's bcsId as there is no block for this
blockID in the local container.
+ long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId();
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ int chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ // Seek to the offset of the chunk. Seek updates the chunkIndex in
the BlockInputStream.
+ blockInputStream.seek(chunkInfoProto.getOffset());
+
+ // Read the chunk data from the BlockInputStream and write it to the
container.
+ int chunkLength = (int) chunkInfoProto.getLen();
+ if (chunkByteBuffer.capacity() < chunkLength) {
+ chunkByteBuffer = ByteBuffer.allocate(chunkLength);
+ }
+
+ chunkByteBuffer.clear();
+ chunkByteBuffer.limit(chunkLength);
+ int bytesRead = blockInputStream.read(chunkByteBuffer);
+ if (bytesRead != chunkLength) {
+ throw new IOException("Error while reading chunk data from block
input stream. Expected length: " +
+ chunkLength + ", Actual length: " + bytesRead);
+ }
+
+ chunkByteBuffer.flip();
+ ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer);
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfulChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
Review Comment:
We can't have holes in the block, so we should abort reconciling the missing
block as soon as a chunk write fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]