Author: szetszwo
Date: Wed Jan 2 12:17:07 2013
New Revision: 1427762
URL: http://svn.apache.org/viewvc?rev=1427762&view=rev
Log:
HDFS-4337. Backport HDFS-4240: For nodegroup-aware block placement, when a node
is excluded, the nodes in the same nodegroup should also be excluded.
Contributed by Meng Gong
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1427762&r1=1427761&r2=1427762&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Jan 2 12:17:07 2013
@@ -369,6 +369,10 @@ Release 1.2.0 - unreleased
MAPREDUCE-4806. Some private methods in JobTracker.RecoveryManager are not
used anymore after MAPREDUCE-3837. (Karthik Kambatla via tomwhite)
+ HDFS-4337. Backport HDFS-4240: For nodegroup-aware block placement, when a
+ node is excluded, the nodes in the same nodegroup should also be excluded.
+ (Meng Gong via szetszwo)
+
Release 1.1.2 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1427762&r1=1427761&r2=1427762&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
(original)
+++
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
Wed Jan 2 12:17:07 2013
@@ -130,8 +130,9 @@ public class BlockPlacementPolicyDefault
List<DatanodeDescriptor> results =
new ArrayList<DatanodeDescriptor>(chosenNodes);
- for (Node node:chosenNodes) {
- excludedNodes.put(node, node);
+ for (DatanodeDescriptor node:chosenNodes) {
+ // add localMachine and related nodes to excludedNodes
+ addToExcludedNodes(node, excludedNodes);
adjustExcludedNodes(excludedNodes, node);
}
@@ -241,6 +242,8 @@ public class BlockPlacementPolicyDefault
if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
results, avoidStaleNodes)) {
results.add(localMachine);
+ // add localMachine and related nodes to excludedNode
+ addToExcludedNodes(localMachine, excludedNodes);
return localMachine;
}
}
@@ -249,6 +252,17 @@ public class BlockPlacementPolicyDefault
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
+ /**
+ * Add <i>localMachine</i> and related nodes to <i>excludedNodes</i>
+ * for next replica choosing. In sub class, we can add more nodes within
+ * the same failure domain of localMachine
+ * @return number of new excluded nodes
+ */
+ protected int addToExcludedNodes(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes) {
+ Node node = excludedNodes.put(localMachine, localMachine);
+ return node == null?1:0;
+ }
/* choose one node from the rack that <i>localMachine</i> is on.
* if no such node is available, choose one node from the rack where
@@ -347,6 +361,8 @@ public class BlockPlacementPolicyDefault
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
avoidStaleNodes)) {
results.add(chosenNode);
+ // add chosenNode and related nodes to excludedNode
+ addToExcludedNodes(chosenNode, excludedNodes);
adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
}
@@ -381,6 +397,9 @@ public class BlockPlacementPolicyDefault
avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
+ // add chosenNode and related nodes to excludedNode
+ int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes);
+ numOfAvailableNodes -= newExcludedNodes;
adjustExcludedNodes(excludedNodes, chosenNode);
}
}
Modified:
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java?rev=1427762&r1=1427761&r2=1427762&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
(original)
+++
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
Wed Jan 2 12:17:07 2013
@@ -242,6 +242,27 @@ public class BlockPlacementPolicyWithNod
}
/**
+ * Find other nodes in the same nodegroup of <i>localMachine</i> and add them
+ * into <i>excludeNodes</i> as replica should not be duplicated for nodes
+ * within the same nodegroup
+ * @return number of new excluded nodes
+ */
+ protected int addToExcludedNodes(DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes) {
+ int countOfExcludedNodes = 0;
+ String nodeGroupScope = localMachine.getNetworkLocation();
+ List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
+ for (Node leafNode : leafNodes) {
+ Node node = excludedNodes.put(leafNode, leafNode);
+ if (node == null) {
+ // not a existing node in excludedNodes
+ countOfExcludedNodes++;
+ }
+ }
+ return countOfExcludedNodes;
+ }
+
+ /**
* Pick up replica node set for deleting replica as over-replicated.
* First set contains replica nodes on rack with more than one
* replica while second set contains remaining replica nodes.
Modified:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java?rev=1427762&r1=1427761&r2=1427762&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
(original)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
Wed Jan 2 12:17:07 2013
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -42,6 +44,8 @@ import org.junit.Test;
public class TestReplicationPolicyWithNodeGroup {
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 8;
+ private static final int NUM_OF_DATANODES_BOUNDARY = 6;
+ private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
private static final Configuration CONF = new Configuration();
private static final NetworkTopology cluster;
private static final NameNode namenode;
@@ -58,6 +62,32 @@ public class TestReplicationPolicyWithNo
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
};
+
+ private final static DatanodeDescriptor dataNodesInBoundaryCase[] =
+ new DatanodeDescriptor[] {
+ DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
+ DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
+ DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
+ };
+
+ private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
+ new DatanodeDescriptor[] {
+ DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
+ DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
+ DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
+ DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
+ DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
+ DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
+ DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
+ DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
+ DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
+ DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
+ DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
+ };
private final static DatanodeDescriptor NODE =
new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9",
"/d2/r4/n7"));
@@ -71,6 +101,12 @@ public class TestReplicationPolicyWithNo
"org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
CONF.set("net.topology.impl",
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+
+ File baseDir = new File(System.getProperty(
+ "test.build.data", "build/test/data"), "dfs/");
+ CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+ new File(baseDir, "name").getPath());
+
NameNode.format(CONF);
namenode = new NameNode(CONF);
} catch (IOException e) {
@@ -94,7 +130,27 @@ public class TestReplicationPolicyWithNo
2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
}
}
-
+
+ /**
+ * Scan the targets list: all targets should be on different NodeGroups.
+ * Return false if two targets are found on the same NodeGroup.
+ */
+ private static boolean checkTargetsOnDifferentNodeGroup(
+ DatanodeDescriptor[] targets) {
+ if(targets.length == 0)
+ return true;
+ Set<String> targetSet = new HashSet<String>();
+ for(DatanodeDescriptor node:targets) {
+ String nodeGroup =
NetworkTopology.getLastHalf(node.getNetworkLocation());
+ if(targetSet.contains(nodeGroup)) {
+ return false;
+ } else {
+ targetSet.add(nodeGroup);
+ }
+ }
+ return true;
+ }
+
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
@@ -481,5 +537,121 @@ public class TestReplicationPolicyWithNo
null, null, (short)1, first, second);
assertEquals(chosenNode, dataNodes[5]);
}
+
+ /**
+ * Test replica placement policy in case of boundary topology.
+ * Rack 2 has only 1 node group & can't be placed with two replicas
+ * The 1st replica will be placed on writer.
+ * The 2nd replica should be placed on a different rack
+ * The 3rd replica should be placed on the same rack with writer, but on a
+ * different node group.
+ */
+ @Test
+ public void testChooseTargetsOnBoundaryTopology() throws Exception {
+ for(int i=0; i<NUM_OF_DATANODES; i++) {
+ cluster.remove(dataNodes[i]);
+ }
+ for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+ cluster.add(dataNodesInBoundaryCase[i]);
+ }
+ for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+ dataNodes[0].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
+
+ dataNodesInBoundaryCase[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 0);
+
+ targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 1);
+
+ targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 2);
+ assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+ targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+ }
+
+ /**
+ * Test re-replication policy in boundary case.
+ * Rack 2 has only one node group & the node in this node group is chosen
+ * Rack 1 has two nodegroups & one of them is chosen.
+ * Replica policy should choose the node from node group of Rack1 but not the
+ * same nodegroup with chosen nodes.
+ */
+ @Test
+ public void testRereplicateOnBoundaryTopology() throws Exception {
+ for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+ dataNodesInBoundaryCase[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+ chosenNodes.add(dataNodesInBoundaryCase[0]);
+ chosenNodes.add(dataNodesInBoundaryCase[5]);
+ DatanodeDescriptor[] targets;
+ targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
+ chosenNodes, BLOCK_SIZE);
+ assertFalse(cluster.isOnSameNodeGroup(targets[0],
+ dataNodesInBoundaryCase[0]));
+ assertFalse(cluster.isOnSameNodeGroup(targets[0],
+ dataNodesInBoundaryCase[5]));
+ assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+ }
+
+ /**
+ * Test replica placement policy in case of targets more than number of
+ * NodeGroups.
+ * The 12-nodes cluster only has 6 NodeGroups, but in some cases, like:
+ * placing submitted job file, there is requirement to choose more (10)
+ * targets for placing replica. We should test it can return 6 targets.
+ */
+ @Test
+ public void testChooseMoreTargetsThanNodeGroups() throws Exception {
+ // Cleanup nodes in previous tests
+ for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
+ DatanodeDescriptor node = dataNodesInBoundaryCase[i];
+ if (cluster.contains(node)) {
+ cluster.remove(node);
+ }
+ }
+
+ for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+ cluster.add(dataNodesInMoreTargetsCase[i]);
+ }
+
+ for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+ dataNodesInMoreTargetsCase[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+
+ DatanodeDescriptor[] targets;
+ // Test normal case -- 3 replicas
+ targets = replicator.chooseTarget(filename, 3,
dataNodesInMoreTargetsCase[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertEquals(targets.length, 3);
+ assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+
+ // Test special case -- replica number over node groups.
+ targets = replicator.chooseTarget(filename, 10,
dataNodesInMoreTargetsCase[0],
+ new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+ assertTrue(checkTargetsOnDifferentNodeGroup(targets));
+ // Verify it only can find 6 targets for placing replicas.
+ assertEquals(targets.length, 6);
+ }
+
}