HDFS-6961. Archival Storage: BlockPlacementPolicy#chooseTarget should check each valid storage type in each choosing round.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e08701ec Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e08701ec Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e08701ec Branch: refs/heads/trunk Commit: e08701ec71f7c10d8f15122d90c35f9f22e40837 Parents: 45d5b13 Author: Jing Zhao <ji...@apache.org> Authored: Thu Sep 4 14:19:32 2014 -0700 Committer: Jing Zhao <ji...@apache.org> Committed: Thu Sep 4 14:19:32 2014 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/BlockStoragePolicy.java | 4 +- .../server/blockmanagement/BlockManager.java | 2 +- .../BlockPlacementPolicyDefault.java | 190 +++++++++++-------- .../BlockPlacementPolicyWithNodeGroup.java | 73 +++---- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 27 ++- .../hadoop/hdfs/TestBlockStoragePolicy.java | 71 +++++-- 6 files changed, 240 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java index b119359..a72d4f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java @@ -23,6 +23,7 @@ import java.util.EnumSet; import java.util.LinkedList; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -99,7 +100,8 @@ public class BlockStoragePolicy { /** The fallback storage type for replication. */ private final StorageType[] replicationFallbacks; - BlockStoragePolicy(byte id, String name, StorageType[] storageTypes, + @VisibleForTesting + public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes, StorageType[] creationFallbacks, StorageType[] replicationFallbacks) { this.id = id; this.name = name; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 389409b..af83653 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -453,7 +453,7 @@ public class BlockManager { } @VisibleForTesting - BlockPlacementPolicy getBlockPlacementPolicy() { + public BlockPlacementPolicy getBlockPlacementPolicy() { return blockplacement; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index a711a09..593ea90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -19,13 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.apache.hadoop.util.Time.now; -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -142,8 +136,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { Set<Node> favoriteAndExcludedNodes = excludedNodes == null ? new HashSet<Node>() : new HashSet<Node>(excludedNodes); - final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes( - (short)numOfReplicas); + final List<StorageType> requiredStorageTypes = storagePolicy + .chooseStorageTypes((short)numOfReplicas); + final EnumMap<StorageType, Integer> storageTypes = + getRequiredStorageTypes(requiredStorageTypes); // Choose favored nodes List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(); @@ -156,13 +152,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, favoriteAndExcludedNodes, blocksize, getMaxNodesPerRack(results.size(), numOfReplicas)[1], - results, avoidStaleNodes, storageTypes.get(0), false); + results, avoidStaleNodes, storageTypes, false); if (target == null) { LOG.warn("Could not find a target for file " + src + " with favored node " + favoredNode); continue; } - storageTypes.remove(0); favoriteAndExcludedNodes.add(target.getDatanodeDescriptor()); } @@ -241,6 +236,21 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return new int[] {numOfReplicas, maxNodesPerRack}; } + private EnumMap<StorageType, Integer> getRequiredStorageTypes( + List<StorageType> types) { + EnumMap<StorageType, Integer> map = new EnumMap<StorageType, + Integer>(StorageType.class); + for (StorageType type : types) { + if (!map.containsKey(type)) { + map.put(type, 1); + } else { + int num = map.get(type); + map.put(type, num + 1); + } + } + return map; + } + /** * choose <i>numOfReplicas</i> from all data nodes * @param numOfReplicas additional number of replicas wanted @@ -272,17 +282,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } // Keep a copy of original excludedNodes - final Set<Node> oldExcludedNodes = avoidStaleNodes ? - new HashSet<Node>(excludedNodes) : null; + final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes); // choose storage types; use fallbacks for unavailable storages - final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes( - (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results), - unavailableStorages, newBlock); + final List<StorageType> requiredStorageTypes = storagePolicy + .chooseStorageTypes((short) totalReplicasExpected, + DatanodeStorageInfo.toStorageTypes(results), + unavailableStorages, newBlock); + final EnumMap<StorageType, Integer> storageTypes = + getRequiredStorageTypes(requiredStorageTypes); - StorageType curStorageType = null; try { - if ((numOfReplicas = storageTypes.size()) == 0) { + if ((numOfReplicas = requiredStorageTypes.size()) == 0) { throw new NotEnoughReplicasException( "All required storage types are unavailable: " + " unavailableStorages=" + unavailableStorages @@ -290,9 +301,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } if (numOfResults == 0) { - curStorageType = storageTypes.remove(0); writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, curStorageType, true) + maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -300,33 +310,30 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); if (numOfResults <= 1) { - curStorageType = storageTypes.remove(0); chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, curStorageType); + results, avoidStaleNodes, storageTypes); if (--numOfReplicas == 0) { return writer; } } if (numOfResults <= 2) { final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor(); - curStorageType = storageTypes.remove(0); if (clusterMap.isOnSameRack(dn0, dn1)) { chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, curStorageType); + results, avoidStaleNodes, storageTypes); } else if (newBlock){ chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, curStorageType); + results, avoidStaleNodes, storageTypes); } else { chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, curStorageType); + results, avoidStaleNodes, storageTypes); } if (--numOfReplicas == 0) { return writer; } } - curStorageType = storageTypes.remove(0); chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, curStorageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { final String message = "Failed to place enough replicas, still in need of " + (totalReplicasExpected - results.size()) + " to reach " @@ -355,10 +362,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { newBlock); } - if (storageTypes.size() > 0) { - // Retry chooseTarget with fallback storage types - unavailableStorages.add(curStorageType); - return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, + boolean retry = false; + // simply add all the remaining types into unavailableStorages and give + // another try. No best effort is guaranteed here. + for (StorageType type : storageTypes.keySet()) { + if (!unavailableStorages.contains(type)) { + unavailableStorages.add(type); + retry = true; + } + } + if (retry) { + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), + oldExcludedNodes); + } + numOfReplicas = totalReplicasExpected - results.size(); + return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, maxNodesPerRack, results, false, storagePolicy, unavailableStorages, newBlock); } @@ -373,28 +392,35 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return the chosen storage */ protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, - Set<Node> excludedNodes, - long blocksize, - int maxNodesPerRack, - List<DatanodeStorageInfo> results, - boolean avoidStaleNodes, - StorageType storageType, - boolean fallbackToLocalRack) + Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, + List<DatanodeStorageInfo> results, boolean avoidStaleNodes, + EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first if (excludedNodes.add(localMachine)) { // was not in the excluded list - for(DatanodeStorageInfo localStorage : DFSUtil.shuffle( - localDatanode.getStorageInfos())) { - if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) { - return localStorage; + for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes + .entrySet().iterator(); iter.hasNext(); ) { + Map.Entry<StorageType, Integer> entry = iter.next(); + for (DatanodeStorageInfo localStorage : DFSUtil.shuffle( + localDatanode.getStorageInfos())) { + StorageType type = entry.getKey(); + if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) { + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + return localStorage; + } } } } @@ -405,7 +431,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } /** @@ -428,23 +454,23 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return the chosen node */ protected DatanodeStorageInfo chooseLocalRack(Node localMachine, - Set<Node> excludedNodes, - long blocksize, - int maxNodesPerRack, - List<DatanodeStorageInfo> results, - boolean avoidStaleNodes, - StorageType storageType) + Set<Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeStorageInfo> results, + boolean avoidStaleNodes, + EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } // choose one from the local rack try { return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; @@ -458,16 +484,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { if (newLocal != null) { try { return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, + storageTypes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } else { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } } @@ -486,18 +513,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxReplicasPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) + EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(), excludedNodes, blocksize, maxReplicasPerRack, results, - avoidStaleNodes, storageType); + avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), localMachine.getNetworkLocation(), excludedNodes, blocksize, - maxReplicasPerRack, results, avoidStaleNodes, storageType); + maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } } @@ -511,10 +538,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) + EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); } /** @@ -528,8 +555,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) - throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes) + throws NotEnoughReplicasException { int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( scope, excludedNodes); @@ -549,18 +576,31 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final DatanodeStorageInfo[] storages = DFSUtil.shuffle( chosenNode.getStorageInfos()); - int i; - for(i = 0; i < storages.length; i++) { - final int newExcludedNodes = addIfIsGoodTarget(storages[i], - excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, - avoidStaleNodes, storageType); - if (newExcludedNodes >= 0) { - numOfReplicas--; - if (firstChosen == null) { - firstChosen = storages[i]; + int i = 0; + boolean search = true; + for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes + .entrySet().iterator(); search && iter.hasNext(); ) { + Map.Entry<StorageType, Integer> entry = iter.next(); + for (i = 0; i < storages.length; i++) { + StorageType type = entry.getKey(); + final int newExcludedNodes = addIfIsGoodTarget(storages[i], + excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, + avoidStaleNodes, type); + if (newExcludedNodes >= 0) { + numOfReplicas--; + if (firstChosen == null) { + firstChosen = storages[i]; + } + numOfAvailableNodes -= newExcludedNodes; + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + search = false; + break; } - numOfAvailableNodes -= newExcludedNodes; - break; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index b3ff6b9..60e192b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -17,12 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; @@ -70,22 +65,33 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType, boolean fallbackToLocalRack - ) throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack) + throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); // otherwise try local machine first if (localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine; if (excludedNodes.add(localMachine)) { // was not in the excluded list - for(DatanodeStorageInfo localStorage : DFSUtil.shuffle( - localDataNode.getStorageInfos())) { - if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) { - return localStorage; + for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes + .entrySet().iterator(); iter.hasNext(); ) { + Map.Entry<StorageType, Integer> entry = iter.next(); + for (DatanodeStorageInfo localStorage : DFSUtil.shuffle( + localDataNode.getStorageInfos())) { + StorageType type = entry.getKey(); + if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) { + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + return localStorage; + } } } } @@ -94,7 +100,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau // try a node on local node group DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup( (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); if (chosenStorage != null) { return chosenStorage; } @@ -104,7 +110,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } /** @return the node of the second replica */ @@ -124,18 +130,19 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected DatanodeStorageInfo chooseLocalRack(Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes) throws + NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } // choose one from the local rack, but off-nodegroup try { final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()); return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e1) { // find the second replica final DatanodeDescriptor newLocal = secondNode(localMachine, results); @@ -143,16 +150,17 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau try { return chooseRandom( clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, + storageTypes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } else { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } } @@ -161,8 +169,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected void chooseRemoteRack(int numOfReplicas, DatanodeDescriptor localMachine, Set<Node> excludedNodes, long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results, - boolean avoidStaleNodes, StorageType storageType) - throws NotEnoughReplicasException { + boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) + throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); final String rackLocation = NetworkTopology.getFirstHalf( @@ -170,12 +178,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau try { // randomly choose from remote racks chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize, - maxReplicasPerRack, results, avoidStaleNodes, storageType); + maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { // fall back to the local rack chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas), rackLocation, excludedNodes, blocksize, - maxReplicasPerRack, results, avoidStaleNodes, storageType); + maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } } @@ -189,11 +197,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau NetworkTopologyWithNodeGroup clusterMap, Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes) throws + NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } // choose one from the local node group @@ -201,7 +210,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau return chooseRandom( clusterMap.getNodeGroup(localMachine.getNetworkLocation()), excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, - storageType); + storageTypes); } catch (NotEnoughReplicasException e1) { final DatanodeDescriptor newLocal = secondNode(localMachine, results); if (newLocal != null) { @@ -209,16 +218,16 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau return chooseRandom( clusterMap.getNodeGroup(newLocal.getNetworkLocation()), excludedNodes, blocksize, maxNodesPerRack, results, - avoidStaleNodes, storageType); + avoidStaleNodes, storageTypes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } else { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index a57dd2f..777127e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -951,9 +951,14 @@ public class DFSTestUtil { public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) { return createDatanodeStorageInfos(n, null, null); } - + public static DatanodeStorageInfo[] createDatanodeStorageInfos( int n, String[] racks, String[] hostnames) { + return createDatanodeStorageInfos(n, racks, hostnames, null); + } + + public static DatanodeStorageInfo[] createDatanodeStorageInfos( + int n, String[] racks, String[] hostnames, StorageType[] types) { DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n]; for(int i = storages.length; i > 0; ) { final String storageID = "s" + i; @@ -961,16 +966,30 @@ public class DFSTestUtil { i--; final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack"; final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host"; - storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname); + final StorageType type = (types != null && i < types.length) ? types[i] + : StorageType.DEFAULT; + storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname, + type); } return storages; } + public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip, String rack, String hostname) { - final DatanodeStorage storage = new DatanodeStorage(storageID); - final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname); + return createDatanodeStorageInfo(storageID, ip, rack, hostname, + StorageType.DEFAULT); + } + + public static DatanodeStorageInfo createDatanodeStorageInfo( + String storageID, String ip, String rack, String hostname, + StorageType type) { + final DatanodeStorage storage = new DatanodeStorage(storageID, + DatanodeStorage.State.NORMAL, type); + final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor( + ip, rack, storage, hostname); return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); } + public static DatanodeDescriptor[] toDatanodeDescriptor( DatanodeStorageInfo[] storages) { DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index 9fdc8cf..da7306c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -19,23 +19,26 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED; +import java.io.File; import java.io.FileNotFoundException; -import java.util.Arrays; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.server.blockmanagement.*; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; import org.junit.Assert; import org.junit.Test; @@ -838,9 +841,7 @@ public class TestBlockStoragePolicy { checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold checkDirectoryListing(barList, WARM, HOT); } finally { - if (cluster != null) { - cluster.shutdown(); - } + cluster.shutdown(); } } @@ -920,9 +921,7 @@ public class TestBlockStoragePolicy { checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(), HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT); } finally { - if (cluster != null) { - cluster.shutdown(); - } + cluster.shutdown(); } } @@ -937,9 +936,7 @@ public class TestBlockStoragePolicy { private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum, int replicaNum, StorageType... types) { List<StorageType> typeList = Lists.newArrayList(); - for (StorageType type : types) { - typeList.add(type); - } + Collections.addAll(typeList, types); LocatedBlocks lbs = status.getBlockLocations(); Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size()); for (LocatedBlock lb : lbs.getLocatedBlocks()) { @@ -1029,4 +1026,50 @@ public class TestBlockStoragePolicy { new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE}); } + + @Test + public void testChooseTargetWithTopology() throws Exception { + BlockStoragePolicy policy1 = new BlockStoragePolicy((byte) 9, "TEST1", + new StorageType[]{StorageType.SSD, StorageType.DISK, + StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{}); + BlockStoragePolicy policy2 = new BlockStoragePolicy((byte) 11, "TEST2", + new StorageType[]{StorageType.DISK, StorageType.SSD, + StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{}); + + final String[] racks = {"/d1/r1", "/d1/r2", "/d1/r2"}; + final String[] hosts = {"host1", "host2", "host3"}; + final StorageType[] types = {StorageType.DISK, StorageType.SSD, + StorageType.ARCHIVE}; + + final DatanodeStorageInfo[] storages = DFSTestUtil + .createDatanodeStorageInfos(3, racks, hosts, types); + final DatanodeDescriptor[] dataNodes = DFSTestUtil + .toDatanodeDescriptor(storages); + + FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + new File(baseDir, "name").getPath()); + DFSTestUtil.formatNameNode(conf); + NameNode namenode = new NameNode(conf); + + final BlockManager bm = namenode.getNamesystem().getBlockManager(); + BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy(); + NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology(); + for (DatanodeDescriptor datanode : dataNodes) { + cluster.add(datanode); + } + + DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, + dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, + new HashSet<Node>(), 0, policy1); + System.out.println(Arrays.asList(targets)); + Assert.assertEquals(3, targets.length); + targets = replicator.chooseTarget("/foo", 3, + dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, + new HashSet<Node>(), 0, policy2); + System.out.println(Arrays.asList(targets)); + Assert.assertEquals(3, targets.length); + } }