haiyang1987 commented on code in PR #6176:
URL: https://github.com/apache/hadoop/pull/6176#discussion_r1384798128
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java:
##########
@@ -3040,6 +3058,99 @@ void rescanPostponedMisreplicatedBlocks() {
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
}
}
+
+ /**
+ * Sets the timeout (in seconds) for excess redundancy blocks, if the
provided timeout is
+ * less than or equal to 0, the default value is used (converted to
milliseconds).
+ * @param timeOut The time (in seconds) to set as the excess redundancy
block timeout.
+ */
+ public void setExcessRedundancyTimeout(long timeOut) {
+ if (timeOut <= 0) {
+ this.excessRedundancyTimeout =
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L;
+ } else {
+ this.excessRedundancyTimeout = timeOut * 1000L;
+ }
+ }
+
+ /**
+ * Sets the limit number of blocks for checking excess redundancy timeout.
+ * If the provided limit is less than or equal to 0, the default limit is
used.
+ *
+ * @param limit The limit number of blocks used to check for excess
redundancy timeout.
+ */
+ public void setExcessRedundancyTimeoutCheckLimit(long limit) {
+ if (excessRedundancyTimeoutCheckLimit <= 0) {
+ this.excessRedundancyTimeoutCheckLimit =
+ DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT;
+ } else {
+ this.excessRedundancyTimeoutCheckLimit = limit;
+ }
+ }
+
+ /**
+ * Process timed-out blocks in the excess redundancy map.
+ */
+ void processTimedOutExcessBlocks() {
+ if (excessRedundancyMap.size() == 0) {
+ return;
+ }
+ namesystem.writeLock();
+ long now = Time.monotonicNow();
+ int processed = 0;
+ try {
+ Iterator<Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>>> iter =
+ excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator();
+ while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) {
+ Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>> entry =
iter.next();
+ String datanodeUuid = entry.getKey();
+ LightWeightHashSet<ExcessBlockInfo> blocks = entry.getValue();
+ List<ExcessRedundancyMap.ExcessBlockInfo> sortedBlocks = new
ArrayList<>(blocks);
+ // Sort blocks by timestamp in descending order.
+ Collections.sort(sortedBlocks);
+
+ for (ExcessBlockInfo excessBlockInfo : sortedBlocks) {
+ if (processed >= excessRedundancyTimeoutCheckLimit) {
+ break;
+ }
+
+ processed++;
+ // If the datanode doesn't have any excess block that has exceeded
the timeout,
+ // can exit this loop.
+ if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout)
{
+ break;
+ }
+
+ BlockInfo blockInfo = excessBlockInfo.getBlockInfo();
+ BlockInfo bi = blocksMap.getStoredBlock(blockInfo);
+ if (bi == null || bi.isDeleted()) {
+ continue;
+ }
+
+ Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos();
+ while (iterator.hasNext()) {
+ DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+ DatanodeDescriptor datanodeDescriptor =
datanodeStorageInfo.getDatanodeDescriptor();
+ if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid)) {
+ if (datanodeStorageInfo.getState().equals(State.NORMAL)) {
Review Comment:
Get it and will fix.
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java:
##########
@@ -2201,4 +2203,130 @@ public void testBlockReportSetNoAckBlockToInvalidate()
throws Exception {
assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb));
}
}
+
+ /**
+ * Test NameNode should process time out excess redundancy blocks.
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test(timeout = 360000)
+ public void testProcessTimedOutExcessBlocks() throws IOException,
+ InterruptedException, TimeoutException {
+ Configuration config = new HdfsConfiguration();
+ // Bump up replication interval.
+ config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
10000);
+ // Set the excess redundancy block timeout.
+ long timeOut = 60L;
+
config.setLong(DFSConfigKeys.DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY,
timeOut);
+
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+
+ final Semaphore semaphore = new Semaphore(0);
+ try (MiniDFSCluster cluster = new
MiniDFSCluster.Builder(config).numDataNodes(3).build()) {
+ DistributedFileSystem fs = cluster.getFileSystem();
+ BlockManager blockManager =
cluster.getNameNode().getNamesystem().getBlockManager();
+ cluster.waitActive();
+
+ final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+ @Override
+ public void delayDeleteReplica() {
+ // Lets wait for the remove replica process.
+ try {
+ semaphore.acquire(1);
+ } catch (InterruptedException e) {
+ // ignore.
+ }
+ }
+ };
+ DataNodeFaultInjector.set(injector);
+
+ // Create file.
+ Path path = new Path("/testfile");
+ DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+ DFSTestUtil.waitReplication(fs, path, (short) 3);
+ LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+ ExtendedBlock extendedBlock = lb.getBlock();
+ DatanodeInfo[] loc = lb.getLocations();
+ assertEquals(3, loc.length);
+
+ // Set replication as 2, to choose excess.
+ fs.setReplication(path, (short) 2);
+
+ // Check excessRedundancyMap and invalidateBlocks size as 1.
+ assertEquals(1, blockManager.getExcessBlocksCount());
+ assertEquals(1, blockManager.getPendingDeletionBlocksCount());
+ DataNode excessDn = Arrays.stream(loc).
+ filter(datanodeInfo -> blockManager.getExcessSize4Testing(
+ datanodeInfo.getDatanodeUuid()) > 0)
+ .map(datanodeInfo -> cluster.getDataNode(datanodeInfo.getIpcPort()))
+ .findFirst()
+ .orElse(null);
+
+ // Schedule blocks for deletion at excessDn.
+ assertEquals(1, blockManager.computeInvalidateWork(1));
+ // Check excessRedundancyMap size as 1.
+ assertEquals(1, blockManager.getExcessBlocksCount());
+ // Check invalidateBlocks size as 0.
+ assertEquals(0, blockManager.getPendingDeletionBlocksCount());
+ assertNotNull(excessDn);
+
+ // Name node will ask datanode to delete replicas in heartbeat response.
Review Comment:
Get it and will fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]