Hexiaoqiao commented on code in PR #6176:
URL: https://github.com/apache/hadoop/pull/6176#discussion_r1384638620
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java:
##########
@@ -3040,6 +3058,99 @@ void rescanPostponedMisreplicatedBlocks() {
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
}
}
+
+ /**
+ * Sets the timeout (in seconds) for excess redundancy blocks, if the
provided timeout is
+ * less than or equal to 0, the default value is used (converted to
milliseconds).
+ * @param timeOut The time (in seconds) to set as the excess redundancy
block timeout.
+ */
+ public void setExcessRedundancyTimeout(long timeOut) {
+ if (timeOut <= 0) {
+ this.excessRedundancyTimeout =
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L;
+ } else {
+ this.excessRedundancyTimeout = timeOut * 1000L;
+ }
+ }
+
+ /**
+ * Sets the limit number of blocks for checking excess redundancy timeout.
+ * If the provided limit is less than or equal to 0, the default limit is
used.
+ *
+ * @param limit The limit number of blocks used to check for excess
redundancy timeout.
+ */
+ public void setExcessRedundancyTimeoutCheckLimit(long limit) {
+ if (excessRedundancyTimeoutCheckLimit <= 0) {
+ this.excessRedundancyTimeoutCheckLimit =
+ DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT;
+ } else {
+ this.excessRedundancyTimeoutCheckLimit = limit;
+ }
+ }
+
+ /**
+ * Process timed-out blocks in the excess redundancy map.
+ */
+ void processTimedOutExcessBlocks() {
+ if (excessRedundancyMap.size() == 0) {
+ return;
+ }
+ namesystem.writeLock();
+ long now = Time.monotonicNow();
+ int processed = 0;
+ try {
+ Iterator<Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>>> iter =
+ excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator();
+ while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) {
+ Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>> entry =
iter.next();
+ String datanodeUuid = entry.getKey();
+ LightWeightHashSet<ExcessBlockInfo> blocks = entry.getValue();
+ List<ExcessRedundancyMap.ExcessBlockInfo> sortedBlocks = new
ArrayList<>(blocks);
+ // Sort blocks by timestamp in descending order.
+ Collections.sort(sortedBlocks);
+
+ for (ExcessBlockInfo excessBlockInfo : sortedBlocks) {
+ if (processed >= excessRedundancyTimeoutCheckLimit) {
+ break;
+ }
+
+ processed++;
+ // If the datanode doesn't have any excess block that has exceeded
the timeout,
+ // can exit this loop.
+ if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout)
{
+ break;
+ }
+
+ BlockInfo blockInfo = excessBlockInfo.getBlockInfo();
+ BlockInfo bi = blocksMap.getStoredBlock(blockInfo);
+ if (bi == null || bi.isDeleted()) {
+ continue;
+ }
+
+ Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos();
+ while (iterator.hasNext()) {
+ DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+ DatanodeDescriptor datanodeDescriptor =
datanodeStorageInfo.getDatanodeDescriptor();
+ if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid)) {
+ if (datanodeStorageInfo.getState().equals(State.NORMAL)) {
Review Comment:
How about combine these two conditions to one as `if (a && b) { do
something; }`?
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java:
##########
@@ -3040,6 +3058,99 @@ void rescanPostponedMisreplicatedBlocks() {
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
}
}
+
+ /**
+ * Sets the timeout (in seconds) for excess redundancy blocks, if the
provided timeout is
+ * less than or equal to 0, the default value is used (converted to
milliseconds).
+ * @param timeOut The time (in seconds) to set as the excess redundancy
block timeout.
+ */
+ public void setExcessRedundancyTimeout(long timeOut) {
+ if (timeOut <= 0) {
+ this.excessRedundancyTimeout =
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L;
+ } else {
+ this.excessRedundancyTimeout = timeOut * 1000L;
+ }
+ }
+
+ /**
+ * Sets the limit number of blocks for checking excess redundancy timeout.
+ * If the provided limit is less than or equal to 0, the default limit is
used.
+ *
+ * @param limit The limit number of blocks used to check for excess
redundancy timeout.
+ */
+ public void setExcessRedundancyTimeoutCheckLimit(long limit) {
+ if (excessRedundancyTimeoutCheckLimit <= 0) {
+ this.excessRedundancyTimeoutCheckLimit =
+ DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT;
+ } else {
+ this.excessRedundancyTimeoutCheckLimit = limit;
+ }
+ }
+
+ /**
+ * Process timed-out blocks in the excess redundancy map.
+ */
+ void processTimedOutExcessBlocks() {
+ if (excessRedundancyMap.size() == 0) {
+ return;
+ }
+ namesystem.writeLock();
+ long now = Time.monotonicNow();
+ int processed = 0;
+ try {
+ Iterator<Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>>> iter =
+ excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator();
+ while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) {
+ Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>> entry =
iter.next();
+ String datanodeUuid = entry.getKey();
+ LightWeightHashSet<ExcessBlockInfo> blocks = entry.getValue();
+ List<ExcessRedundancyMap.ExcessBlockInfo> sortedBlocks = new
ArrayList<>(blocks);
Review Comment:
`ExcessRedundancyMap` is redundant here.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java:
##########
@@ -95,12 +97,12 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo
blk) {
* @return true if the block is removed.
*/
synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
- final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
+ final LightWeightHashSet<ExcessBlockInfo> set =
map.get(dn.getDatanodeUuid());
if (set == null) {
return false;
}
- final boolean removed = set.remove(blk);
+ final boolean removed = set.remove(new ExcessBlockInfo(blk));
Review Comment:
As the last comment too.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java:
##########
@@ -111,4 +113,56 @@ synchronized boolean remove(DatanodeDescriptor dn,
BlockInfo blk) {
}
return removed;
}
+
+ synchronized Map<String, LightWeightHashSet<ExcessBlockInfo>>
getExcessRedundancyMap() {
+ return map;
+ }
+
+ /**
+ * An object that contains information about a block that is being excess
redundancy.
+ * It records the timestamp when added excess redundancy map of this block.
+ */
+ static class ExcessBlockInfo implements Comparable<ExcessBlockInfo> {
+ private long timeStamp;
+ private BlockInfo blockInfo;
+
+ ExcessBlockInfo(BlockInfo blockInfo) {
+ this.timeStamp = monotonicNow();
+ this.blockInfo = blockInfo;
+ }
+
+ public BlockInfo getBlockInfo() {
+ return blockInfo;
+ }
+
+ long getTimeStamp() {
+ return timeStamp;
+ }
+
+ void setTimeStamp() {
+ timeStamp = monotonicNow();
+ }
+
+ @Override
+ public int hashCode() {
+ return blockInfo.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ExcessBlockInfo)) {
+ return false;
+ }
+ ExcessBlockInfo other = (ExcessBlockInfo) obj;
+ return (this.blockInfo.equals(other.blockInfo));
Review Comment:
Is it enough to compare `blockInfo` only? If true, we don't need to create
new instance to `contains` or `remove` to avoid more heap footprint cost. Right?
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java:
##########
@@ -2201,4 +2203,130 @@ public void testBlockReportSetNoAckBlockToInvalidate()
throws Exception {
assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb));
}
}
+
+ /**
+ * Test NameNode should process time out excess redundancy blocks.
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test(timeout = 360000)
+ public void testProcessTimedOutExcessBlocks() throws IOException,
+ InterruptedException, TimeoutException {
+ Configuration config = new HdfsConfiguration();
+ // Bump up replication interval.
+ config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
10000);
+ // Set the excess redundancy block timeout.
+ long timeOut = 60L;
+
config.setLong(DFSConfigKeys.DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY,
timeOut);
+
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+
+ final Semaphore semaphore = new Semaphore(0);
+ try (MiniDFSCluster cluster = new
MiniDFSCluster.Builder(config).numDataNodes(3).build()) {
+ DistributedFileSystem fs = cluster.getFileSystem();
+ BlockManager blockManager =
cluster.getNameNode().getNamesystem().getBlockManager();
+ cluster.waitActive();
+
+ final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+ @Override
+ public void delayDeleteReplica() {
+ // Lets wait for the remove replica process.
+ try {
+ semaphore.acquire(1);
+ } catch (InterruptedException e) {
+ // ignore.
+ }
+ }
+ };
+ DataNodeFaultInjector.set(injector);
+
+ // Create file.
+ Path path = new Path("/testfile");
+ DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+ DFSTestUtil.waitReplication(fs, path, (short) 3);
+ LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+ ExtendedBlock extendedBlock = lb.getBlock();
+ DatanodeInfo[] loc = lb.getLocations();
+ assertEquals(3, loc.length);
+
+ // Set replication as 2, to choose excess.
+ fs.setReplication(path, (short) 2);
+
+ // Check excessRedundancyMap and invalidateBlocks size as 1.
+ assertEquals(1, blockManager.getExcessBlocksCount());
+ assertEquals(1, blockManager.getPendingDeletionBlocksCount());
+ DataNode excessDn = Arrays.stream(loc).
+ filter(datanodeInfo -> blockManager.getExcessSize4Testing(
+ datanodeInfo.getDatanodeUuid()) > 0)
+ .map(datanodeInfo -> cluster.getDataNode(datanodeInfo.getIpcPort()))
+ .findFirst()
+ .orElse(null);
+
+ // Schedule blocks for deletion at excessDn.
+ assertEquals(1, blockManager.computeInvalidateWork(1));
+ // Check excessRedundancyMap size as 1.
+ assertEquals(1, blockManager.getExcessBlocksCount());
+ // Check invalidateBlocks size as 0.
+ assertEquals(0, blockManager.getPendingDeletionBlocksCount());
+ assertNotNull(excessDn);
+
+ // Name node will ask datanode to delete replicas in heartbeat response.
Review Comment:
I prefer `NameNode` to `Name node`.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java:
##########
@@ -64,8 +66,8 @@ synchronized void clear() {
* datanode and the given block?
*/
synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
- final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
- return set != null && set.contains(blk);
+ final LightWeightHashSet<ExcessBlockInfo> set =
map.get(dn.getDatanodeUuid());
+ return set != null && set.contains(new ExcessBlockInfo(blk));
Review Comment:
I am concerned if it will involve more heap footprint when `new` frequently.
Is it necessary here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]