HDFS-12809. [READ] Fix the randomized selection of locations in {{ProvidedBlocksBuilder}}.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b0948868 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b0948868 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b0948868 Branch: refs/heads/HDFS-9806 Commit: b0948868b805048dcd52349e805c2f209f6598a1 Parents: 68e046c Author: Virajith Jalaparti <viraj...@apache.org> Authored: Mon Nov 27 17:04:20 2017 -0800 Committer: Virajith Jalaparti <viraj...@apache.org> Committed: Fri Dec 15 10:18:28 2017 -0800 ---------------------------------------------------------------------- .../blockmanagement/ProvidedStorageMap.java | 112 +++++++------------ .../TestNameNodeProvidedImplementation.java | 26 ++++- 2 files changed, 61 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0948868/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 6fec977..c85eb2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -19,11 +19,12 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.NavigableMap; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; @@ -229,11 +230,8 @@ public class ProvidedStorageMap { sids.add(currInfo.getStorageID()); types.add(storageType); if (StorageType.PROVIDED.equals(storageType)) { - DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); - locs.add( - new DatanodeInfoWithStorage( - dn, currInfo.getStorageID(), currInfo.getStorageType())); - excludedUUids.add(dn.getDatanodeUuid()); + // Provided location will be added to the list of locations after + // examining all local locations. isProvidedBlock = true; } else { locs.add(new DatanodeInfoWithStorage( @@ -245,11 +243,17 @@ public class ProvidedStorageMap { int numLocations = locs.size(); if (isProvidedBlock) { + // add the first datanode here + DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); + locs.add( + new DatanodeInfoWithStorage(dn, storageId, StorageType.PROVIDED)); + excludedUUids.add(dn.getDatanodeUuid()); + numLocations++; // add more replicas until we reach the defaultReplication for (int count = numLocations + 1; count <= defaultReplication && count <= providedDescriptor .activeProvidedDatanodes(); count++) { - DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); + dn = chooseProvidedDatanode(excludedUUids); locs.add(new DatanodeInfoWithStorage( dn, storageId, StorageType.PROVIDED)); sids.add(storageId); @@ -284,6 +288,9 @@ public class ProvidedStorageMap { private final NavigableMap<String, DatanodeDescriptor> dns = new ConcurrentSkipListMap<>(); + // maintain a separate list of the datanodes with provided storage + // to efficiently choose Datanodes when required. + private final List<DatanodeDescriptor> dnR = new ArrayList<>(); public final static String NETWORK_LOCATION = "/REMOTE"; public final static String NAME = "PROVIDED"; @@ -300,8 +307,8 @@ public class ProvidedStorageMap { DatanodeStorageInfo getProvidedStorage( DatanodeDescriptor dn, DatanodeStorage s) { - LOG.info("XXXXX adding Datanode " + dn.getDatanodeUuid()); dns.put(dn.getDatanodeUuid(), dn); + dnR.add(dn); // TODO: maintain separate RPC ident per dn return storageMap.get(s.getStorageID()); } @@ -315,84 +322,42 @@ public class ProvidedStorageMap { } DatanodeDescriptor choose(DatanodeDescriptor client) { - // exact match for now - DatanodeDescriptor dn = client != null ? - dns.get(client.getDatanodeUuid()) : null; - if (null == dn) { - dn = chooseRandom(); - } - return dn; + return choose(client, Collections.<String>emptySet()); } DatanodeDescriptor choose(DatanodeDescriptor client, Set<String> excludedUUids) { // exact match for now - DatanodeDescriptor dn = client != null ? - dns.get(client.getDatanodeUuid()) : null; - - if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) { - dn = null; - Set<String> exploredUUids = new HashSet<String>(); - - while(exploredUUids.size() < dns.size()) { - Map.Entry<String, DatanodeDescriptor> d = - dns.ceilingEntry(UUID.randomUUID().toString()); - if (null == d) { - d = dns.firstEntry(); - } - String uuid = d.getValue().getDatanodeUuid(); - //this node has already been explored, and was not selected earlier - if (exploredUUids.contains(uuid)) { - continue; - } - exploredUUids.add(uuid); - //this node has been excluded - if (excludedUUids.contains(uuid)) { - continue; - } - return dns.get(uuid); - } - } - - return dn; - } - - DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) { - // TODO: Currently this is not uniformly random; - // skewed toward sparse sections of the ids - Set<DatanodeDescriptor> excludedNodes = - new HashSet<DatanodeDescriptor>(); - if (excludedStorages != null) { - for (int i= 0; i < excludedStorages.length; i++) { - LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor()); - excludedNodes.add(excludedStorages[i].getDatanodeDescriptor()); + if (client != null && !excludedUUids.contains(client.getDatanodeUuid())) { + DatanodeDescriptor dn = dns.get(client.getDatanodeUuid()); + if (dn != null) { + return dn; } } - Set<DatanodeDescriptor> exploredNodes = new HashSet<DatanodeDescriptor>(); - while(exploredNodes.size() < dns.size()) { - Map.Entry<String, DatanodeDescriptor> d = - dns.ceilingEntry(UUID.randomUUID().toString()); - if (null == d) { - d = dns.firstEntry(); - } - DatanodeDescriptor node = d.getValue(); - //this node has already been explored, and was not selected earlier - if (exploredNodes.contains(node)) { - continue; + Random r = new Random(); + for (int i = dnR.size() - 1; i >= 0; --i) { + int pos = r.nextInt(i + 1); + DatanodeDescriptor node = dnR.get(pos); + String uuid = node.getDatanodeUuid(); + if (!excludedUUids.contains(uuid)) { + return node; } - exploredNodes.add(node); - //this node has been excluded - if (excludedNodes.contains(node)) { - continue; - } - return node; + Collections.swap(dnR, i, pos); } return null; } - DatanodeDescriptor chooseRandom() { - return chooseRandom(null); + DatanodeDescriptor chooseRandom(DatanodeStorageInfo... excludedStorages) { + Set<String> excludedNodes = new HashSet<>(); + if (excludedStorages != null) { + for (int i = 0; i < excludedStorages.length; i++) { + DatanodeDescriptor dn = excludedStorages[i].getDatanodeDescriptor(); + String uuid = dn.getDatanodeUuid(); + excludedNodes.add(uuid); + } + } + return choose(null, excludedNodes); } @Override @@ -414,6 +379,7 @@ public class ProvidedStorageMap { DatanodeDescriptor storedDN = dns.get(dnToRemove.getDatanodeUuid()); if (storedDN != null) { dns.remove(dnToRemove.getDatanodeUuid()); + dnR.remove(dnToRemove); } } return dns.size(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0948868/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java index 9c82967..09e8f97 100644 --- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java +++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java @@ -27,8 +27,11 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.util.HashSet; import java.util.Iterator; import java.util.Random; +import java.util.Set; + import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -480,16 +483,31 @@ public class TestNameNodeProvidedImplementation { // given the start and length in the above call, // only one LocatedBlock in LocatedBlocks assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size()); - LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0); - assertEquals(expectedLocations, locatedBlock.getLocations().length); - return locatedBlock.getLocations(); + DatanodeInfo[] locations = + locatedBlocks.getLocatedBlocks().get(0).getLocations(); + assertEquals(expectedLocations, locations.length); + checkUniqueness(locations); + return locations; + } + + /** + * verify that the given locations are all unique. + * @param locations + */ + private void checkUniqueness(DatanodeInfo[] locations) { + Set<String> set = new HashSet<>(); + for (DatanodeInfo info: locations) { + assertFalse("All locations should be unique", + set.contains(info.getDatanodeUuid())); + set.add(info.getDatanodeUuid()); + } } /** * Tests setting replication of provided files. * @throws Exception */ - @Test(timeout=30000) + @Test(timeout=50000) public void testSetReplicationForProvidedFiles() throws Exception { createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, FixedBlockResolver.class); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org