This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new c892a87 HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He. c892a87 is described below commit c892a879ddce3abfd51c8609c81148bf6e4f9daa Author: He Xiaoqiao <hexiaoq...@apache.org> AuthorDate: Fri Nov 15 12:15:33 2019 -0800 HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He. Signed-off-by: Wei-Chiu Chuang <weic...@apache.org> Reviewed-by: Inigo Goiri <inigo...@apache.org> Reviewed-by: Istvan Fajth <pi...@cloudera.com> --- .../org/apache/hadoop/net/NetworkTopology.java | 68 ++++++++++++++---- .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++ .../server/blockmanagement/DatanodeManager.java | 24 ++++++- .../src/main/resources/hdfs-default.xml | 13 +++- .../blockmanagement/TestDatanodeManager.java | 82 ++++++++++++++++++++++ .../org/apache/hadoop/net/TestNetworkTopology.java | 34 ++++++--- 6 files changed, 199 insertions(+), 26 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 724cec3..66799f5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; /** The class represents a cluster of computer with a tree hierarchical * network topology. @@ -874,11 +875,33 @@ public class NetworkTopology { * This method is called if the reader is a datanode, * so nonDataNodeReader flag is set to false. */ - sortByDistance(reader, nodes, activeLen, false); + sortByDistance(reader, nodes, activeLen, list -> Collections.shuffle(list)); } /** - * Sort nodes array by network distance to <i>reader</i>. + * Sort nodes array by network distance to <i>reader</i> with secondary sort. + * <p> + * In a three-level topology, a node can be either local, on the same rack, + * or on a different rack from the reader. Sorting the nodes based on network + * distance from the reader reduces network traffic and improves + * performance. + * <p> + * As an additional twist, we also randomize the nodes at each network + * distance. This helps with load balancing when there is data skew. + * + * @param reader Node where data will be read + * @param nodes Available replicas with the requested data + * @param activeLen Number of active nodes at the front of the array + * @param secondarySort a secondary sorting strategy which can inject into + * that point from outside to help sort the same distance. + */ + public <T extends Node> void sortByDistance(Node reader, T[] nodes, + int activeLen, Consumer<List<T>> secondarySort){ + sortByDistance(reader, nodes, activeLen, secondarySort, false); + } + + /** + * Sort nodes array by network distance to <i>reader</i> with secondary sort. * <p> using network location. This is used when the reader * is not a datanode. Sorting the nodes based on network distance * from the reader reduces network traffic and improves @@ -895,7 +918,27 @@ public class NetworkTopology { * This method is called if the reader is not a datanode, * so nonDataNodeReader flag is set to true. */ - sortByDistance(reader, nodes, activeLen, true); + sortByDistanceUsingNetworkLocation(reader, nodes, activeLen, + list -> Collections.shuffle(list)); + } + + /** + * Sort nodes array by network distance to <i>reader</i>. + * <p> using network location. This is used when the reader + * is not a datanode. Sorting the nodes based on network distance + * from the reader reduces network traffic and improves + * performance. + * <p> + * + * @param reader Node where data will be read + * @param nodes Available replicas with the requested data + * @param activeLen Number of active nodes at the front of the array + * @param secondarySort a secondary sorting strategy which can inject into + * that point from outside to help sort the same distance. + */ + public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader, + T[] nodes, int activeLen, Consumer<List<T>> secondarySort) { + sortByDistance(reader, nodes, activeLen, secondarySort, true); } /** @@ -909,7 +952,8 @@ public class NetworkTopology { * @param activeLen Number of active nodes at the front of the array * @param nonDataNodeReader True if the reader is not a datanode */ - private void sortByDistance(Node reader, Node[] nodes, int activeLen, + private <T extends Node> void sortByDistance(Node reader, T[] nodes, + int activeLen, Consumer<List<T>> secondarySort, boolean nonDataNodeReader) { /** Sort weights for the nodes array */ int[] weights = new int[activeLen]; @@ -921,23 +965,23 @@ public class NetworkTopology { } } // Add weight/node pairs to a TreeMap to sort - TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>(); + TreeMap<Integer, List<T>> tree = new TreeMap<>(); for (int i=0; i<activeLen; i++) { int weight = weights[i]; - Node node = nodes[i]; - List<Node> list = tree.get(weight); + T node = nodes[i]; + List<T> list = tree.get(weight); if (list == null) { list = Lists.newArrayListWithExpectedSize(1); tree.put(weight, list); } list.add(node); } - + // Sort nodes which have the same weight using secondarySort. int idx = 0; - for (List<Node> list: tree.values()) { + for (List<T> list: tree.values()) { if (list != null) { - Collections.shuffle(list, r); - for (Node n: list) { + secondarySort.accept(list); + for (T n: list) { nodes[idx] = n; idx++; } @@ -946,4 +990,4 @@ public class NetworkTopology { Preconditions.checkState(idx == activeLen, "Sorted the wrong number of nodes!"); } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a2df317..1c3a71f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -230,6 +230,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY; public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT = true; + public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY = + "dfs.namenode.read.considerLoad"; + public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT = + false; public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR = "dfs.namenode.redundancy.considerLoad.factor"; public static final double diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 049c949..8adb03d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -30,7 +30,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -66,6 +65,7 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Manage datanodes, include decommission and other activities. @@ -134,6 +134,9 @@ public class DatanodeManager { /** Whether or not to avoid using stale DataNodes for reading */ private final boolean avoidStaleDataNodesForRead; + /** Whether or not to consider lad for reading. */ + private final boolean readConsiderLoad; + /** * Whether or not to avoid using stale DataNodes for writing. * Note that, even if this is configured, the policy may be @@ -314,6 +317,9 @@ public class DatanodeManager { this.avoidStaleDataNodesForRead = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT); + this.readConsiderLoad = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, + DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT); this.avoidStaleDataNodesForWrite = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); @@ -530,9 +536,10 @@ public class DatanodeManager { int activeLen = lastActiveIndex + 1; if(nonDatanodeReader) { networktopology.sortByDistanceUsingNetworkLocation(client, - lb.getLocations(), activeLen); + lb.getLocations(), activeLen, createSecondaryNodeSorter()); } else { - networktopology.sortByDistance(client, lb.getLocations(), activeLen); + networktopology.sortByDistance(client, lb.getLocations(), activeLen, + createSecondaryNodeSorter()); } // move PROVIDED storage to the end to prefer local replicas. lb.moveProvidedToEnd(activeLen); @@ -540,6 +547,17 @@ public class DatanodeManager { lb.updateCachedStorageInfo(); } + private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() { + Consumer<List<DatanodeInfo>> secondarySort = + list -> Collections.shuffle(list); + if (readConsiderLoad) { + Comparator<DatanodeInfo> comp = + Comparator.comparingInt(DatanodeInfo::getXceiverCount); + secondarySort = list -> Collections.sort(list, comp); + } + return secondarySort; + } + /** @return the datanode descriptor for the host. */ public DatanodeDescriptor getDatanodeByHost(final String host) { return host2DatanodeMap.getDatanodeByHost(host); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 98b91a6..7effbd0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -307,7 +307,9 @@ <property> <name>dfs.namenode.redundancy.considerLoad</name> <value>true</value> - <description>Decide if chooseTarget considers the target's load or not + <description> + Decide if chooseTarget considers the target's load or not when write. + Turn on by default. </description> </property> @@ -320,6 +322,15 @@ </property> <property> + <name>dfs.namenode.read.considerLoad</name> + <value>false</value> + <description> + Decide if sort block locations considers the target's load or not when read. + Turn off by default. + </description> +</property> + +<property> <name>dfs.datanode.httpserver.filter.handlers</name> <value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value> <description>Comma separated list of Netty servlet-style filter handlers to inject into the Datanode WebHDFS I/O path diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 210e434..e8e6b94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -518,6 +518,88 @@ public class TestDatanodeManager { assertEquals(locs[4].getIpAddr(), sortedLocs2[0].getIpAddr()); } + @Test + public void testGetBlockLocationConsiderLoad() + throws IOException, URISyntaxException { + Configuration conf = new Configuration(); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + URL shellScript = getClass().getResource( + "/" + Shell.appendScriptExtension("topology-script")); + Path resourcePath = Paths.get(shellScript.toURI()); + FileUtil.setExecutable(resourcePath.toFile(), true); + conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + resourcePath.toString()); + DatanodeManager dm = mockDatanodeManager(fsn, conf); + + int totalDNs = 5; + // Register 5 datanodes and 2 nodes per rack with different load. + DatanodeInfo[] locs = new DatanodeInfo[totalDNs]; + String[] storageIDs = new String[totalDNs]; + for (int i = 0; i < totalDNs; i++) { + // Register new datanode. + String uuid = "UUID-" + i; + String ip = "IP-" + i / 2 + "-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + dm.registerDatanode(dr); + + // Get location and storage information. + locs[i] = dm.getDatanode(uuid); + storageIDs[i] = "storageID-" + i; + + // Set load for datanodes. + locs[i].setXceiverCount(i); + } + + // Set node 0 decommissioned. + locs[0].setDecommissioned(); + + // Create LocatedBlock with above locations. + ExtendedBlock b = new ExtendedBlock("somePoolID", 1234); + LocatedBlock block = new LocatedBlock(b, locs); + List<LocatedBlock> blocks = new ArrayList<>(); + blocks.add(block); + + // Test client located at locs[3] in cluster. + final String targetIpInCluster = locs[3].getIpAddr(); + dm.sortLocatedBlocks(targetIpInCluster, blocks); + DatanodeInfo[] sortedLocs = block.getLocations(); + assertEquals(totalDNs, sortedLocs.length); + // Ensure the local node is first. + assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr()); + // Ensure the lightweight node is more close when distance is same. + assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr()); + assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr()); + assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr()); + assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr()); + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[4].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr()); + + // Test client not in cluster but same rack with locs[3]. + final String targetIpNotInCluster = locs[3].getIpAddr() + "-client"; + dm.sortLocatedBlocks(targetIpNotInCluster, blocks); + DatanodeInfo[] sortedLocs2 = block.getLocations(); + assertEquals(totalDNs, sortedLocs2.length); + // Ensure the local rack is first and lightweight node is first + // when distance is same. + assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr()); + assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr()); + assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr()); + assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr()); + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[4].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr()); + } + /** * Test whether removing a host from the includes list without adding it to * the excludes list will exclude it from data node reports. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index 9466a75..f16bfb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -241,10 +241,16 @@ public class TestNetworkTopology { cluster.setRandomSeed(0xDEADBEEF); cluster.sortByDistance(dataNodes[8], dtestNodes, dtestNodes.length - 2); assertTrue(dtestNodes[0] == dataNodes[8]); - assertTrue(dtestNodes[1] == dataNodes[11]); - assertTrue(dtestNodes[2] == dataNodes[12]); - assertTrue(dtestNodes[3] == dataNodes[9]); - assertTrue(dtestNodes[4] == dataNodes[10]); + assertTrue(dtestNodes[1] != dtestNodes[2]); + assertTrue(dtestNodes[1] == dataNodes[11] + || dtestNodes[1] == dataNodes[12]); + assertTrue(dtestNodes[2] == dataNodes[11] + || dtestNodes[2] == dataNodes[12]); + assertTrue(dtestNodes[3] != dtestNodes[4]); + assertTrue(dtestNodes[3] == dataNodes[9] + || dtestNodes[3] == dataNodes[10]); + assertTrue(dtestNodes[4] == dataNodes[9] + || dtestNodes[4] == dataNodes[10]); // array contains local node testNodes[0] = dataNodes[1]; @@ -331,10 +337,14 @@ public class TestNetworkTopology { testNodes[2] = dataNodes[8]; Node rackClient = new NodeBase("/d3/r1/25.25.25"); cluster.setRandomSeed(0xDEADBEEF); - cluster.sortByDistance(rackClient, testNodes, testNodes.length); + cluster.sortByDistanceUsingNetworkLocation(rackClient, testNodes, + testNodes.length); assertTrue(testNodes[0] == dataNodes[8]); - assertTrue(testNodes[1] == dataNodes[5]); - assertTrue(testNodes[2] == dataNodes[0]); + assertTrue(testNodes[1] != testNodes[2]); + assertTrue(testNodes[1] == dataNodes[0] + || testNodes[1] == dataNodes[5]); + assertTrue(testNodes[2] == dataNodes[0] + || testNodes[2] == dataNodes[5]); //Reader is not a datanode , but is in one of the datanode's data center. testNodes[0] = dataNodes[8]; @@ -342,10 +352,14 @@ public class TestNetworkTopology { testNodes[2] = dataNodes[0]; Node dcClient = new NodeBase("/d1/r2/25.25.25"); cluster.setRandomSeed(0xDEADBEEF); - cluster.sortByDistance(dcClient, testNodes, testNodes.length); + cluster.sortByDistanceUsingNetworkLocation(dcClient, testNodes, + testNodes.length); assertTrue(testNodes[0] == dataNodes[0]); - assertTrue(testNodes[1] == dataNodes[5]); - assertTrue(testNodes[2] == dataNodes[8]); + assertTrue(testNodes[1] != testNodes[2]); + assertTrue(testNodes[1] == dataNodes[5] + || testNodes[1] == dataNodes[8]); + assertTrue(testNodes[2] == dataNodes[5] + || testNodes[2] == dataNodes[8]); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org