This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 4bd873b816d HDFS-17044. Set size of non-exist block to NO_ACK when process FBR or IBR to avoid useless report from DataNode. (#5735). Contributed by Haiyang Hu. 4bd873b816d is described below commit 4bd873b816dbd889f410428d6e618586d4ff1780 Author: huhaiyang <huhaiyang...@126.com> AuthorDate: Wed Jun 28 12:03:15 2023 +0800 HDFS-17044. Set size of non-exist block to NO_ACK when process FBR or IBR to avoid useless report from DataNode. (#5735). Contributed by Haiyang Hu. Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../hdfs/server/blockmanagement/BlockManager.java | 7 +- .../server/datanode/metrics/DataNodeMetrics.java | 4 ++ .../server/blockmanagement/TestBlockManager.java | 80 ++++++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 16b79539fdd..fab3619cb2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3295,8 +3295,11 @@ public class BlockManager implements BlockStatsMXBean { BlockInfo storedBlock = getStoredBlock(block); if(storedBlock == null) { // If blocksMap does not contain reported block id, - // the replica should be removed from the data-node. - toInvalidate.add(new Block(block)); + // The replica should be removed from Datanode, and set NumBytes to BlockCommand.No_ACK to + // avoid useless report to NameNode from Datanode when complete to process it. + Block invalidateBlock = new Block(block); + invalidateBlock.setNumBytes(BlockCommand.NO_ACK); + toInvalidate.add(invalidateBlock); return null; } BlockUCState ucState = storedBlock.getBlockUCState(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index c3aa3c3a454..da19f287119 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -346,6 +346,10 @@ public class DataNodeMetrics { blocksRemoved.incr(delta); } + public long getBlocksRemoved() { + return blocksRemoved.value(); + } + public void incrBytesWritten(int delta) { bytesWritten.incr(delta); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index a054511423d..3e00491e993 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -117,6 +117,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -2121,4 +2122,83 @@ public class TestBlockManager { assertEquals(2, locs[0].getHosts().length); } } + + /** + * Test processing toInvalidate in block reported, if the block not exists need + * to set the numBytes of the block to NO_ACK, + * the DataNode processing will not report incremental blocks. + */ + @Test(timeout = 360000) + public void testBlockReportSetNoAckBlockToInvalidate() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + try (MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build()) { + cluster.waitActive(); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + DistributedFileSystem fs = cluster.getFileSystem(); + // Write file. + Path file = new Path("/test"); + DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L); + DFSTestUtil.waitReplication(fs, file, (short) 1); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, file).get(0); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(1, loc.length); + List<DataNode> datanodes = cluster.getDataNodes(); + assertEquals(1, datanodes.size()); + DataNode datanode = datanodes.get(0); + assertEquals(datanode.getDatanodeUuid(), loc[0].getDatanodeUuid()); + + MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name()); + // Check the IncrementalBlockReportsNumOps of DataNode, it will be 0. + assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb)); + + // Delete file and remove block. + fs.delete(file, false); + + // Wait for the processing of the marked deleted block to complete. + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(blockManager); + assertNull(blockManager.getStoredBlock(lb.getBlock().getLocalBlock())); + + // Expire heartbeat on the NameNode,and datanode to be marked dead. + datanode.setHeartbeatsDisabledForTests(true); + cluster.setDataNodeDead(datanode.getDatanodeId()); + assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock())); + + // Wait for re-registration and heartbeat. + datanode.setHeartbeatsDisabledForTests(false); + final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0) + .getBlockManager().getDatanodeManager() + .getDatanode(datanode.getDatanodeId()); + GenericTestUtils.waitFor( + () -> dn1Desc.isAlive() && dn1Desc.isHeartbeatedSinceRegistration(), + 100, 5000); + + // Trigger BlockReports and block is not exists, + // it will add invalidateBlocks and set block numBytes be NO_ACK. + cluster.triggerBlockReports(); + GenericTestUtils.waitFor( + () -> blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock()), + 100, 1000); + + // Trigger schedule blocks for deletion at datanode. + int workCount = blockManager.computeInvalidateWork(1); + assertEquals(1, workCount); + assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock())); + + // Wait for the blocksRemoved value in DataNode to be 1. + GenericTestUtils.waitFor( + () -> datanode.getMetrics().getBlocksRemoved() == 1, + 100, 5000); + + // Trigger immediate deletion report at datanode. + cluster.triggerDeletionReports(); + + // Delete block numBytes be NO_ACK and will not deletion block report, + // so check the IncrementalBlockReportsNumOps of DataNode still 1. + assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb)); + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org