This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 900430b HDFS-14861. Reset LowRedundancyBlocks Iterator periodically.
Contributed by Stephen O'Donnell.
900430b is described below
commit 900430b9907b590ed2d73a0d68f079c7f4d754b1
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Feb 25 13:27:53 2020 -0800
HDFS-14861. Reset LowRedundancyBlocks Iterator periodically. Contributed by
Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <[email protected]>
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +++
.../hdfs/server/blockmanagement/BlockManager.java | 35 ++++++++++++++++++++--
.../blockmanagement/LowRedundancyBlocks.java | 24 ++++++++++++++-
.../src/main/resources/hdfs-default.xml | 18 +++++++++++
.../TestLowRedundancyBlockQueues.java | 27 +++++++++++++++++
5 files changed, 105 insertions(+), 3 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index bb8039c..51900a4 100755
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -244,6 +244,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys
{
public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3;
+ public static final String DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS =
+ "dfs.namenode.redundancy.queue.restart.iterations";
+ public static final int
+ DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT = 2400;
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 626048f..e2b22d3 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -300,6 +300,16 @@ public class BlockManager implements BlockStatsMXBean {
*/
private final long redundancyRecheckIntervalMs;
+ /**
+ * Tracks how many calls have been made to chooseLowReduncancyBlocks since
+ * the queue position was last reset to the queue head. If CallsSinceReset
+ * crosses the threshold the next call will reset the iterators. A threshold
+ * of zero means the queue position will only be reset once the next of the
+ * queue has been reached.
+ */
+ private int replQueueResetToHeadThreshold;
+ private int replQueueCallsSinceReset = 0;
+
/** How often to check and the limit for the storageinfo efficiency. */
private final long storageInfoDefragmentInterval;
private final long storageInfoDefragmentTimeout;
@@ -572,6 +582,18 @@ public class BlockManager implements BlockStatsMXBean {
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
+ replQueueResetToHeadThreshold = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
+
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
+ if (replQueueResetToHeadThreshold < 0) {
+ LOG.warn("{} is set to {} and it must be >= 0. Resetting to default {}",
+ DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
+ replQueueResetToHeadThreshold, DFSConfigKeys.
+ DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
+ replQueueResetToHeadThreshold = DFSConfigKeys.
+ DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT;
+ }
+
long heartbeatIntervalSecs = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
@@ -1912,9 +1934,18 @@ public class BlockManager implements BlockStatsMXBean {
List<List<BlockInfo>> blocksToReconstruct = null;
namesystem.writeLock();
try {
- // Choose the blocks to be reconstructed
+ boolean reset = false;
+ if (replQueueResetToHeadThreshold > 0) {
+ if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
+ reset = true;
+ replQueueCallsSinceReset = 0;
+ } else {
+ replQueueCallsSinceReset++;
+ }
+ }
+ // Choose the blocks to be reconstructed
blocksToReconstruct = neededReconstruction
- .chooseLowRedundancyBlocks(blocksToProcess);
+ .chooseLowRedundancyBlocks(blocksToProcess, reset);
} finally {
namesystem.writeUnlock();
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
index 40ea980..8cf9dd4 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
@@ -488,6 +488,28 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
*/
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
int blocksToProcess) {
+ return chooseLowRedundancyBlocks(blocksToProcess, false);
+ }
+
+ /**
+ * Get a list of block lists without sufficient redundancy. The index of
+ * block lists represents its replication priority. Iterates each block list
+ * in priority order beginning with the highest priority list. Iterators use
+ * a bookmark to resume where the previous iteration stopped. Returns when
+ * the block count is met or iteration reaches the end of the lowest priority
+ * list, in which case bookmarks for each block list are reset to the heads
+ * of their respective lists.
+ *
+ * @param blocksToProcess - number of blocks to fetch from low redundancy
+ * blocks.
+ * @param resetIterators - After gathering the list of blocks reset the
+ * position of all queue iterators to the head of the queue so
+ * subsequent calls will begin at the head of the queue
+ * @return Return a list of block lists to be replicated. The block list
+ * index represents its redundancy priority.
+ */
+ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
+ int blocksToProcess, boolean resetIterators) {
final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
int count = 0;
@@ -509,7 +531,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
}
}
- if (priority == LEVEL) {
+ if (priority == LEVEL || resetIterators) {
// Reset all bookmarks because there were no recently added blocks.
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
q.resetBookmark();
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index ad556c6..0b7d13a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1137,6 +1137,24 @@
</property>
<property>
+ <name>dfs.namenode.redundancy.queue.restart.iterations</name>
+ <value>2400</value>
+ <description>When picking blocks from the low redundancy queues, reset the
+ bookmarked iterator after the set number of iterations to ensure any blocks
+ which were not processed on the first pass are retried before the iterators
+ would naturally reach their end point. This ensures blocks are retried
+ more frequently when there are many pending blocks or blocks are
+ continuously added to the queues preventing the iterator reaching its
+ natural endpoint.
+ The default setting of 2400 combined with the default of
+ dfs.namenode.redundancy.interval.seconds means the iterators will be reset
+ approximately every 2 hours.
+ Setting this parameter to zero disables the feature and the iterators will
+ be reset only when the end of all queues has been reached.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.accesstime.precision</name>
<value>3600000</value>
<description>The access time for HDFS file is precise upto this value.
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
index 785f3be..000c636 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -92,6 +93,32 @@ public class TestLowRedundancyBlockQueues {
queues.getHighestPriorityECBlockCount());
}
+ @Test
+ public void testQueuePositionCanBeReset() throws Throwable {
+ LowRedundancyBlocks queues = new LowRedundancyBlocks();
+ for (int i=0; i< 4; i++) {
+ BlockInfo block = genBlockInfo(i);
+ queues.add(block, 2, 0, 0, 3);
+ }
+ List<List<BlockInfo>> blocks;
+ // Get one block from the queue - should be block ID 0 returned
+ blocks = queues.chooseLowRedundancyBlocks(1, false);
+ assertEquals(1, blocks.get(2).size());
+ assertEquals(0, blocks.get(2).get(0).getBlockId());
+
+ // Get the next blocks - should be ID 1
+ blocks = queues.chooseLowRedundancyBlocks(1, false);
+ assertEquals(1, blocks.get(2).get(0).getBlockId());
+
+ // Get the next block, but also reset this time - should be ID 2 returned
+ blocks = queues.chooseLowRedundancyBlocks(1, true);
+ assertEquals(2, blocks.get(2).get(0).getBlockId());
+
+ // Get one more block and due to resetting the queue it will be block id 0
+ blocks = queues.chooseLowRedundancyBlocks(1, false);
+ assertEquals(0, blocks.get(2).get(0).getBlockId());
+ }
+
/**
* Test that adding blocks with different replication counts puts them
* into different queues.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]