errose28 commented on code in PR #8908:
URL: https://github.com/apache/ozone/pull/8908#discussion_r2258445451
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java:
##########
@@ -228,6 +229,66 @@ public void testContainerReconciliation(int
numBlocksToDelete, int numChunksToCo
assertEquals(healthyDataChecksum, repairedDataChecksum);
}
+ @Test
+ public void testContainerReconciliationWithPeerFailure() throws Exception {
+ LOG.info("Testing container reconciliation with peer failure for container
{}", CONTAINER_ID);
+ // Introduce corruption in the first datanode
+ MockDatanode corruptedNode = datanodes.get(0);
+ MockDatanode healthyNode1 = datanodes.get(1);
+ MockDatanode healthyNode2 = datanodes.get(2);
+ corruptedNode.introduceCorruption(CONTAINER_ID, 1, 1, false);
+
+ // Use synchronous on-demand scans to re-build the merkle trees after
corruption.
+ datanodes.forEach(d -> d.scanContainer(CONTAINER_ID));
+
+ // Without reconciliation, checksums should be different.
+ assertUniqueChecksumCount(CONTAINER_ID, datanodes, 2);
+ waitForExpectedScanCount(1);
+
+ // Create a failing peer - we'll make the second datanode fail during
getContainerChecksumInfo
+ DatanodeDetails failingPeerDetails = healthyNode1.getDnDetails();
+ Map<DatanodeDetails, MockDatanode> dnMap = datanodes.stream()
+ .collect(Collectors.toMap(MockDatanode::getDnDetails,
Function.identity()));
+
+ containerProtocolMock.when(() ->
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+ .thenAnswer(inv -> {
+ XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+ long containerID = inv.getArgument(1);
+ Pipeline pipeline = xceiverClientSpi.getPipeline();
+ assertEquals(1, pipeline.size());
+ DatanodeDetails dn = pipeline.getFirstNode();
+
+ // Throw exception for the specific failing peer
+ if (dn.equals(failingPeerDetails)) {
+ throw new IOException("Simulated peer failure for testing");
+ }
+
+ return dnMap.get(dn).getChecksumInfo(containerID);
+ });
+
+ // Now reconcile the corrupted node with its peers (including the failing
one)
+ List<DatanodeDetails> peers = Arrays.asList(failingPeerDetails,
healthyNode2.getDnDetails());
+ corruptedNode.reconcileContainer(dnClient, peers, CONTAINER_ID);
+
+ // Wait for scan to complete - but this time we only expect the corrupted
node to have a scan
+ // triggered by reconciliation, so we wait specifically for that one
+ try {
+ GenericTestUtils.waitFor(() -> corruptedNode.getOnDemandScanCount() ==
2, 100, 5_000);
+ } catch (TimeoutException ex) {
+ LOG.warn("Timed out waiting for on-demand scan after reconciliation.
Current count: {}",
+ corruptedNode.getOnDemandScanCount());
+ }
+
+ // The corrupted node should still be repaired because it was able to
reconcile with the healthy peer
+ // even though one peer failed
+ long corruptedChecksum =
corruptedNode.checkAndGetDataChecksum(CONTAINER_ID);
+ long healthyChecksum = healthyNode2.checkAndGetDataChecksum(CONTAINER_ID);
+ assertEquals(healthyChecksum, corruptedChecksum);
Review Comment:
All nodes are expected to match because the unreachable node was not
corrupt. This version also checks that the healthy checksum was restored back
to the same value it started at.
```suggestion
long repairedDataChecksum = assertUniqueChecksumCount(CONTAINER_ID,
datanodes, 1);
assertEquals(healthyDataChecksum, repairedDataChecksum);
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1594,11 +1594,19 @@ public void
reconcileContainer(DNContainerOperationClient dnClient, Container<?>
containerID, peer,
checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo)));
// Data checksum updated after each peer reconciles.
long start = Instant.now().toEpochMilli();
- ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
- containerID, peer);
- if (peerChecksumInfo == null) {
- LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
- containerID, peer);
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo;
+
+ try {
+ // Data checksum updated after each peer reconciles.
+ peerChecksumInfo = dnClient.getContainerChecksumInfo(containerID,
peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not
yet generated a checksum",
+ containerID, peer);
+ continue;
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to connect to peer {} for container {}
reconciliation. Skipping to next peer.",
+ peer, containerID, e);
Review Comment:
We can rearrange this so that we don't make a copy of our merkle tree to
write that never gets used.
```diff
@@ -1585,17 +1585,13 @@ public void
reconcileContainer(DNContainerOperationClient dnClient, Container<?>
long numMissingBlocksRepaired = 0;
long numCorruptChunksRepaired = 0;
long numMissingChunksRepaired = 0;
- // This will be updated as we do repairs with this peer, then used to
write the updated tree for the diff with the
- // next peer.
- ContainerMerkleTreeWriter updatedTreeWriter =
- new
ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree());
LOG.info("Beginning reconciliation for container {} with peer {}.
Current data checksum is {}",
containerID, peer,
checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo)));
// Data checksum updated after each peer reconciles.
long start = Instant.now().toEpochMilli();
+
ContainerProtos.ContainerChecksumInfo peerChecksumInfo;
-
try {
// Data checksum updated after each peer reconciles.
peerChecksumInfo = dnClient.getContainerChecksumInfo(containerID,
peer);
@@ -1610,6 +1606,11 @@ public void
reconcileContainer(DNContainerOperationClient dnClient, Container<?>
continue;
}
+ // This will be updated as we do repairs with this peer, then used to
write the updated tree for the diff with the
+ // next peer.
+ ContainerMerkleTreeWriter updatedTreeWriter =
+ new
ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree());
+
ContainerDiffReport diffReport =
checksumManager.diff(latestChecksumInfo, peerChecksumInfo);
Pipeline pipeline = createSingleNodePipeline(peer);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]