This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new fb0519253d66 HDFS-17488. DN can fail IBRs with NPE when a volume is removed (#6759) fb0519253d66 is described below commit fb0519253d66ec218abc3c2b6bcbf03e9270d07f Author: Felix Nguyen <23214709+kokonguyen...@users.noreply.github.com> AuthorDate: Sat May 11 15:37:43 2024 +0800 HDFS-17488. DN can fail IBRs with NPE when a volume is removed (#6759) --- .../hadoop-common/src/site/markdown/Metrics.md | 1 + .../hdfs/server/datanode/BPOfferService.java | 7 ++++ .../hadoop/hdfs/server/datanode/DataNode.java | 3 +- .../server/datanode/DataNodeFaultInjector.java | 6 +++ .../hdfs/server/datanode/DirectoryScanner.java | 2 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 10 +++-- .../server/datanode/metrics/DataNodeMetrics.java | 5 +++ .../hdfs/server/datanode/TestBPOfferService.java | 8 ++-- .../hdfs/server/datanode/TestDirectoryScanner.java | 48 ++++++++++++++++++++++ 9 files changed, 82 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index aaead837102e..a89d254d937c 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -532,6 +532,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `NumProcessedCommands` | Num of processed commands of all BPServiceActors | | `ProcessedCommandsOpNumOps` | Total number of processed commands operations | | `ProcessedCommandsOpAvgTime` | Average time of processed commands operations in milliseconds | +| `NullStorageBlockReports` | Number of blocks in IBRs that failed due to null storage | FsVolume -------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 1a2c024c904c..11489e919c49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Sets; @@ -324,6 +325,12 @@ class BPOfferService { final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo( block.getLocalBlock(), status, delHint); final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid); + if (storage == null) { + LOG.warn("Trying to add RDBI for null storage UUID {}. Trace: {}", storageUuid, + Joiner.on("\n").join(Thread.currentThread().getStackTrace())); + getDataNode().getMetrics().incrNullStorageBlockReports(); + return; + } for (BPServiceActor actor : bpServices) { actor.getIbrManager().notifyNamenodeBlock(info, storage, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 956f5bbe519d..87e8eee681d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -4057,7 +4057,8 @@ public class DataNode extends ReconfigurableBase } } - private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) { + @VisibleForTesting + public void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) { if (unhealthyVolumes.isEmpty()) { LOG.debug("handleVolumeFailures done with empty " + "unhealthyVolumes"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 372271b4fb28..9e046cc3600d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -172,4 +172,10 @@ public class DataNodeFaultInjector { * Just delay getMetaDataInputStream a while. */ public void delayGetMetaDataInputStream() {} + + /** + * Used in {@link DirectoryScanner#reconcile()} to wait until a storage is removed, + * leaving a stale copy of {@link DirectoryScanner#diffs}. + */ + public void waitUntilStorageRemoved() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 30a2d2e58431..a99f3d78e2ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -466,7 +466,7 @@ public class DirectoryScanner implements Runnable { public void reconcile() throws IOException { LOG.debug("reconcile start DirectoryScanning"); scan(); - + DataNodeFaultInjector.get().waitUntilStorageRemoved(); // HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too // long int loopCount = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 5be095118fc3..0ca222c083c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2745,8 +2745,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { curDirScannerNotifyCount = 0; lastDirScannerNotifyTime = startTimeMs; } - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, - vol.getStorageID())) { + String storageUuid = vol.getStorageID(); + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, storageUuid)) { + if (!storageMap.containsKey(storageUuid)) { + // Storage was already removed + return; + } memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { @@ -2833,7 +2837,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { maxDirScannerNotifyCount++; datanode.notifyNamenodeReceivedBlock( new ExtendedBlock(bpid, diskBlockInfo), null, - vol.getStorageID(), vol.isTransientStorage()); + storageUuid, vol.isTransientStorage()); } if (vol.isTransientStorage()) { long lockedBytesReserved = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 2e902f694a12..832a8029f777 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -185,6 +185,8 @@ public class DataNodeMetrics { private MutableCounterLong numProcessedCommands; @Metric("Rate of processed commands of all BPServiceActors") private MutableRate processedCommandsOp; + @Metric("Number of blocks in IBRs that failed due to null storage") + private MutableCounterLong nullStorageBlockReports; // FsDatasetImpl local file process metrics. @Metric private MutableRate createRbwOp; @@ -812,4 +814,7 @@ public class DataNodeMetrics { replaceBlockOpToOtherHost.incr(); } + public void incrNullStorageBlockReports() { + nullStorageBlockReports.incr(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 65855427d725..fd1b5609b1f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -136,6 +136,7 @@ public class TestBPOfferService { private FsDatasetSpi<?> mockFSDataset; private DataSetLockManager dataSetLockManager = new DataSetLockManager(); private boolean isSlownode; + private String mockStorageID; @Before public void setupMocks() throws Exception { @@ -157,6 +158,7 @@ public class TestBPOfferService { // Set up a simulated dataset with our fake BP mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf)); mockFSDataset.addBlockPool(FAKE_BPID, conf); + mockStorageID = ((SimulatedFSDataset) mockFSDataset).getStorages().get(0).getStorageUuid(); // Wire the dataset to the DN. Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(); @@ -289,7 +291,7 @@ public class TestBPOfferService { waitForBlockReport(mockNN2); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); assertEquals(1, ret.length); @@ -1099,7 +1101,7 @@ public class TestBPOfferService { waitForBlockReport(mockNN2); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false); ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1); @@ -1140,7 +1142,7 @@ public class TestBPOfferService { Mockito.verify(mockNN3).registerDatanode(Mockito.any()); // When we receive a block, it should report it to both NNs - bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false); + bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false); // veridfy new NN recieved block report ret = waitForBlockReceived(FAKE_BLOCK, mockNN3); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 96b32639632f..3392410d1fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -37,9 +37,11 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -1420,4 +1422,50 @@ public class TestDirectoryScanner { DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0); } } + + @Test(timeout = 30000) + public void testNullStorage() throws Exception { + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + + Configuration conf = getConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false); + // Make sure checkAndUpdate will run + truncateBlockFile(); + + // Mock a volume corruption after DirectoryScanner.scan() but before checkAndUpdate() + FsVolumeImpl volumeToRemove = fds.getVolumeList().get(0); + DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void waitUntilStorageRemoved() { + Set<FsVolumeSpi> volumesToRemove = new HashSet<>(); + volumesToRemove.add(volumeToRemove); + cluster.getDataNodes().get(0).handleVolumeFailures(volumesToRemove); + } + }; + DataNodeFaultInjector.set(injector); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(DataNode.LOG); + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); + scanner.reconcile(); + assertFalse(logCapturer.getOutput() + .contains("Trying to add RDBI for null storage UUID " + volumeToRemove.getStorageID())); + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + cluster.shutdown(); + DataNodeFaultInjector.set(oldInjector); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org