http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 78c1149..d2eb85b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -18,17 +18,22 @@ package org.apache.hadoop.hdfs.server.balancer; import java.io.Closeable; -import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; @@ -45,6 +50,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import com.google.common.annotations.VisibleForTesting; + /** * The class provides utilities for accessing a NameNode. */ @@ -53,6 +60,41 @@ public class NameNodeConnector implements Closeable { private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); private static final int MAX_NOT_CHANGED_ITERATIONS = 5; + private static boolean write2IdFile = true; + + /** Create {@link NameNodeConnector} for the given namenodes. */ + public static List<NameNodeConnector> newNameNodeConnectors( + Collection<URI> namenodes, String name, Path idPath, Configuration conf) + throws IOException { + final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>( + namenodes.size()); + for (URI uri : namenodes) { + NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, + null, conf); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); + } + return connectors; + } + + public static List<NameNodeConnector> newNameNodeConnectors( + Map<URI, List<Path>> namenodes, String name, Path idPath, + Configuration conf) throws IOException { + final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>( + namenodes.size()); + for (Map.Entry<URI, List<Path>> entry : namenodes.entrySet()) { + NameNodeConnector nnc = new NameNodeConnector(name, entry.getKey(), + idPath, entry.getValue(), conf); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); + } + return connectors; + } + + @VisibleForTesting + public static void setWrite2IdFile(boolean write2IdFile) { + NameNodeConnector.write2IdFile = write2IdFile; + } private final URI nameNodeUri; private final String blockpoolID; @@ -62,17 +104,21 @@ public class NameNodeConnector implements Closeable { private final KeyManager keyManager; final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); - private final FileSystem fs; + private final DistributedFileSystem fs; private final Path idPath; private final OutputStream out; + private final List<Path> targetPaths; private int notChangedIterations = 0; public NameNodeConnector(String name, URI nameNodeUri, Path idPath, - Configuration conf) throws IOException { + List<Path> targetPaths, Configuration conf) + throws IOException { this.nameNodeUri = nameNodeUri; this.idPath = idPath; - + this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays + .asList(new Path("/")) : targetPaths; + this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class).getProxy(); this.client = NameNodeProxies.createProxy(conf, nameNodeUri, @@ -85,13 +131,18 @@ public class NameNodeConnector implements Closeable { final FsServerDefaults defaults = fs.getServerDefaults(new Path("/")); this.keyManager = new KeyManager(blockpoolID, namenode, defaults.getEncryptDataTransfer(), conf); - // Exit if there is another one running. - out = checkAndMarkRunning(); + // if it is for test, we do not create the id file + out = checkAndMarkRunning(); if (out == null) { + // Exit if there is another one running. throw new IOException("Another " + name + " is running."); } } + public DistributedFileSystem getDistributedFileSystem() { + return fs; + } + /** @return the block pool ID */ public String getBlockpoolID() { return blockpoolID; @@ -114,6 +165,11 @@ public class NameNodeConnector implements Closeable { return keyManager; } + /** @return the list of paths to scan/migrate */ + public List<Path> getTargetPaths() { + return targetPaths; + } + /** Should the instance continue running? */ public boolean shouldContinue(long dispatchBlockMoveBytes) { if (dispatchBlockMoveBytes > 0) { @@ -147,9 +203,11 @@ public class NameNodeConnector implements Closeable { */ private OutputStream checkAndMarkRunning() throws IOException { try { - final DataOutputStream out = fs.create(idPath); - out.writeBytes(InetAddress.getLocalHost().getHostName()); - out.flush(); + final FSDataOutputStream out = fs.create(idPath); + if (write2IdFile) { + out.writeBytes(InetAddress.getLocalHost().getHostName()); + out.hflush(); + } return out; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index e215c17..d095a1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@ -60,6 +60,11 @@ public interface BlockCollection { */ public short getBlockReplication(); + /** + * @return the storage policy ID. + */ + public byte getStoragePolicyID(); + /** * Get the name of the collection. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5d23c1f..7c18444 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -254,6 +255,7 @@ public class BlockManager { /** for block replicas placement */ private BlockPlacementPolicy blockplacement; + private final BlockStoragePolicy.Suite storagePolicySuite; /** Check whether name system is running before terminating */ private boolean checkNSRunning = true; @@ -276,6 +278,7 @@ public class BlockManager { blockplacement = BlockPlacementPolicy.getInstance( conf, stats, datanodeManager.getNetworkTopology(), datanodeManager.getHost2DatanodeMap()); + storagePolicySuite = BlockStoragePolicy.readBlockStorageSuite(conf); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); @@ -394,7 +397,11 @@ public class BlockManager { lifetimeMin*60*1000L, 0, null, encryptionAlgorithm); } } - + + public BlockStoragePolicy getStoragePolicy(final String policyName) { + return storagePolicySuite.getPolicy(policyName); + } + public void setBlockPoolId(String blockPoolId) { if (isBlockTokenEnabled()) { blockTokenSecretManager.setBlockPoolId(blockPoolId); @@ -445,7 +452,7 @@ public class BlockManager { return datanodeManager; } - /** @return the BlockPlacementPolicy */ + @VisibleForTesting public BlockPlacementPolicy getBlockPlacementPolicy() { return blockplacement; } @@ -1366,7 +1373,7 @@ public class BlockManager { // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the block collection itself. - rw.chooseTargets(blockplacement, excludedNodes); + rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); } namesystem.writeLock(); @@ -1470,24 +1477,48 @@ public class BlockManager { return scheduledWork; } + /** Choose target for WebHDFS redirection. */ + public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, + DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) { + return blockplacement.chooseTarget(src, 1, clientnode, + Collections.<DatanodeStorageInfo>emptyList(), false, excludes, + blocksize, storagePolicySuite.getDefaultPolicy()); + } + + /** Choose target for getting additional datanodes for an existing pipeline. */ + public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src, + int numAdditionalNodes, + DatanodeDescriptor clientnode, + List<DatanodeStorageInfo> chosen, + Set<Node> excludes, + long blocksize, + byte storagePolicyID) { + + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); + return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, + chosen, true, excludes, blocksize, storagePolicy); + } + /** - * Choose target datanodes according to the replication policy. + * Choose target datanodes for creating a new block. * * @throws IOException * if the number of targets < minimum replication. * @see BlockPlacementPolicy#chooseTarget(String, int, Node, - * List, boolean, Set, long, StorageType) + * Set, long, List, BlockStoragePolicy) */ - public DatanodeStorageInfo[] chooseTarget(final String src, + public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final int numOfReplicas, final DatanodeDescriptor client, final Set<Node> excludedNodes, - final long blocksize, List<String> favoredNodes) throws IOException { + final long blocksize, + final List<String> favoredNodes, + final byte storagePolicyID) throws IOException { List<DatanodeDescriptor> favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, - // TODO: get storage type from file - favoredDatanodeDescriptors, StorageType.DEFAULT); + favoredDatanodeDescriptors, storagePolicy); if (targets.length < minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" @@ -2719,6 +2750,10 @@ public class BlockManager { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(b); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID()); + final List<StorageType> excessTypes = storagePolicy.chooseExcess( + replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); + final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<String, List<DatanodeStorageInfo>>(); @@ -2739,16 +2774,13 @@ public class BlockManager { final DatanodeStorageInfo addedNodeStorage = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode); while (nonExcess.size() - replication > 0) { - // check if we can delete delNodeHint final DatanodeStorageInfo cur; - if (firstOne && delNodeHintStorage != null - && (moreThanOne.contains(delNodeHintStorage) - || (addedNodeStorage != null - && !moreThanOne.contains(addedNodeStorage)))) { + if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, + moreThanOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal cur = replicator.chooseReplicaToDelete(bc, b, replication, - moreThanOne, exactlyOne); + moreThanOne, exactlyOne, excessTypes); } firstOne = false; @@ -2774,6 +2806,27 @@ public class BlockManager { } } + /** Check if we can use delHint */ + static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, + DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, + List<StorageType> excessTypes) { + if (!isFirst) { + return false; // only consider delHint for the first case + } else if (delHint == null) { + return false; // no delHint + } else if (!excessTypes.contains(delHint.getStorageType())) { + return false; // delHint storage type is not an excess type + } else { + // check if removing delHint reduces the number of racks + if (moreThan1Racks.contains(delHint)) { + return true; // delHint and some other nodes are under the same rack + } else if (added != null && !moreThan1Racks.contains(added)) { + return true; // the added node adds a new rack + } + return false; // removing delHint reduces the number of racks; + } + } + private void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); @@ -2880,7 +2933,7 @@ public class BlockManager { // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. - node.decrementBlocksScheduled(); + node.decrementBlocksScheduled(storageInfo.getStorageType()); // get the deletion hint node DatanodeDescriptor delHintNode = null; @@ -3549,10 +3602,12 @@ public class BlockManager { } private void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicy.Suite storagePolicySuite, Set<Node> excludedNodes) { targets = blockplacement.chooseTarget(bc.getName(), additionalReplRequired, srcNode, liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), StorageType.DEFAULT); + excludedNodes, block.getNumBytes(), + storagePolicySuite.getPolicy(bc.getStoragePolicyID())); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 0a4dd81..af58127 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; @@ -75,7 +76,7 @@ public abstract class BlockPlacementPolicy { boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, - StorageType storageType); + BlockStoragePolicy storagePolicy); /** * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)} @@ -89,14 +90,14 @@ public abstract class BlockPlacementPolicy { Set<Node> excludedNodes, long blocksize, List<DatanodeDescriptor> favoredNodes, - StorageType storageType) { + BlockStoragePolicy storagePolicy) { // This class does not provide the functionality of placing // a block in favored datanodes. The implementations of this class // are expected to provide this functionality return chooseTarget(src, numOfReplicas, writer, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, - excludedNodes, blocksize, storageType); + excludedNodes, blocksize, storagePolicy); } /** @@ -118,18 +119,21 @@ public abstract class BlockPlacementPolicy { * @param srcBC block collection of file to which block-to-be-deleted belongs * @param block The block to be deleted * @param replicationFactor The required number of replicas for this block - * @param existingReplicas The replica locations of this block that are present - on at least two unique racks. - * @param moreExistingReplicas Replica locations of this block that are not - listed in the previous parameter. + * @param moreThanOne The replica locations of this block that are present + * on more than one unique racks. + * @param exactlyOne Replica locations of this block that are present + * on exactly one unique racks. + * @param excessTypes The excess {@link StorageType}s according to the + * {@link BlockStoragePolicy}. * @return the replica that is the best candidate for deletion */ abstract public DatanodeStorageInfo chooseReplicaToDelete( BlockCollection srcBC, Block block, short replicationFactor, - Collection<DatanodeStorageInfo> existingReplicas, - Collection<DatanodeStorageInfo> moreExistingReplicas); + Collection<DatanodeStorageInfo> moreThanOne, + Collection<DatanodeStorageInfo> exactlyOne, + List<StorageType> excessTypes); /** * Used to setup a BlockPlacementPolicy object. This should be defined by http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index f77d4ab..a0e6701 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.apache.hadoop.util.Time.now; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; @@ -80,12 +76,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { */ protected int tolerateHeartbeatMultiplier; - protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap, - Host2NodesMap host2datanodeMap) { - initialize(conf, stats, clusterMap, host2datanodeMap); - } - protected BlockPlacementPolicyDefault() { } @@ -117,9 +107,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, - StorageType storageType) { + final BlockStoragePolicy storagePolicy) { return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, - excludedNodes, blocksize, storageType); + excludedNodes, blocksize, storagePolicy); } @Override @@ -129,17 +119,21 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { Set<Node> excludedNodes, long blocksize, List<DatanodeDescriptor> favoredNodes, - StorageType storageType) { + BlockStoragePolicy storagePolicy) { try { if (favoredNodes == null || favoredNodes.size() == 0) { // Favored nodes not specified, fall back to regular block placement. return chooseTarget(src, numOfReplicas, writer, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, - excludedNodes, blocksize, storageType); + excludedNodes, blocksize, storagePolicy); } Set<Node> favoriteAndExcludedNodes = excludedNodes == null ? new HashSet<Node>() : new HashSet<Node>(excludedNodes); + final List<StorageType> requiredStorageTypes = storagePolicy + .chooseStorageTypes((short)numOfReplicas); + final EnumMap<StorageType, Integer> storageTypes = + getRequiredStorageTypes(requiredStorageTypes); // Choose favored nodes List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(); @@ -152,7 +146,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, favoriteAndExcludedNodes, blocksize, getMaxNodesPerRack(results.size(), numOfReplicas)[1], - results, avoidStaleNodes, storageType, false); + results, avoidStaleNodes, storageTypes, false); if (target == null) { LOG.warn("Could not find a target for file " + src + " with favored node " + favoredNode); @@ -166,7 +160,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { numOfReplicas -= results.size(); DatanodeStorageInfo[] remainingTargets = chooseTarget(src, numOfReplicas, writer, results, - false, favoriteAndExcludedNodes, blocksize, storageType); + false, favoriteAndExcludedNodes, blocksize, storagePolicy); for (int i = 0; i < remainingTargets.length; i++) { results.add(remainingTargets[i]); } @@ -174,10 +168,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return getPipeline(writer, results.toArray(new DatanodeStorageInfo[results.size()])); } catch (NotEnoughReplicasException nr) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose with favored nodes (=" + favoredNodes + + "), disregard favored nodes hint and retry.", nr); + } // Fall back to regular block placement disregarding favored nodes hint return chooseTarget(src, numOfReplicas, writer, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, - excludedNodes, blocksize, storageType); + excludedNodes, blocksize, storagePolicy); } } @@ -188,7 +186,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, - StorageType storageType) { + final BlockStoragePolicy storagePolicy) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return DatanodeStorageInfo.EMPTY_ARRAY; } @@ -213,8 +211,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); - Node localNode = chooseTarget(numOfReplicas, writer, - excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, + EnumSet.noneOf(StorageType.class), results.isEmpty()); if (!returnChosenNodes) { results.removeAll(chosenStorage); } @@ -234,7 +233,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; return new int[] {numOfReplicas, maxNodesPerRack}; } - + + private EnumMap<StorageType, Integer> getRequiredStorageTypes( + List<StorageType> types) { + EnumMap<StorageType, Integer> map = new EnumMap<StorageType, + Integer>(StorageType.class); + for (StorageType type : types) { + if (!map.containsKey(type)) { + map.put(type, 1); + } else { + int num = map.get(type); + map.put(type, num + 1); + } + } + return map; + } + /** * choose <i>numOfReplicas</i> from all data nodes * @param numOfReplicas additional number of replicas wanted @@ -247,31 +261,49 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return local node of writer (not chosen node) */ private Node chooseTarget(int numOfReplicas, - Node writer, - Set<Node> excludedNodes, - long blocksize, - int maxNodesPerRack, - List<DatanodeStorageInfo> results, - final boolean avoidStaleNodes, - StorageType storageType) { + Node writer, + final Set<Node> excludedNodes, + final long blocksize, + final int maxNodesPerRack, + final List<DatanodeStorageInfo> results, + final boolean avoidStaleNodes, + final BlockStoragePolicy storagePolicy, + final EnumSet<StorageType> unavailableStorages, + final boolean newBlock) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return writer; } - int totalReplicasExpected = numOfReplicas + results.size(); - - int numOfResults = results.size(); - boolean newBlock = (numOfResults==0); + final int numOfResults = results.size(); + final int totalReplicasExpected = numOfReplicas + numOfResults; if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) { writer = results.get(0).getDatanodeDescriptor(); } // Keep a copy of original excludedNodes - final Set<Node> oldExcludedNodes = avoidStaleNodes ? - new HashSet<Node>(excludedNodes) : null; + final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes); + + // choose storage types; use fallbacks for unavailable storages + final List<StorageType> requiredStorageTypes = storagePolicy + .chooseStorageTypes((short) totalReplicasExpected, + DatanodeStorageInfo.toStorageTypes(results), + unavailableStorages, newBlock); + final EnumMap<StorageType, Integer> storageTypes = + getRequiredStorageTypes(requiredStorageTypes); + if (LOG.isTraceEnabled()) { + LOG.trace("storageTypes=" + storageTypes); + } + try { + if ((numOfReplicas = requiredStorageTypes.size()) == 0) { + throw new NotEnoughReplicasException( + "All required storage types are unavailable: " + + " unavailableStorages=" + unavailableStorages + + ", storagePolicy=" + storagePolicy); + } + if (numOfResults == 0) { writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType, true) + maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -280,7 +312,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); if (numOfResults <= 1) { chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); if (--numOfReplicas == 0) { return writer; } @@ -289,24 +321,28 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor(); if (clusterMap.isOnSameRack(dn0, dn1)) { chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); } else if (newBlock){ chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); } else { chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); } if (--numOfReplicas == 0) { return writer; } } chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { final String message = "Failed to place enough replicas, still in need of " + (totalReplicasExpected - results.size()) + " to reach " - + totalReplicasExpected + "."; + + totalReplicasExpected + + " (unavailableStorages=" + unavailableStorages + + ", storagePolicy=" + storagePolicy + + ", newBlock=" + newBlock + ")"; + if (LOG.isTraceEnabled()) { LOG.trace(message, e); } else { @@ -327,7 +363,28 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // if the NotEnoughReplicasException was thrown in chooseRandom(). numOfReplicas = totalReplicasExpected - results.size(); return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, - maxNodesPerRack, results, false, storageType); + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); + } + + boolean retry = false; + // simply add all the remaining types into unavailableStorages and give + // another try. No best effort is guaranteed here. + for (StorageType type : storageTypes.keySet()) { + if (!unavailableStorages.contains(type)) { + unavailableStorages.add(type); + retry = true; + } + } + if (retry) { + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), + oldExcludedNodes); + } + numOfReplicas = totalReplicasExpected - results.size(); + return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); } } return writer; @@ -340,28 +397,35 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return the chosen storage */ protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, - Set<Node> excludedNodes, - long blocksize, - int maxNodesPerRack, - List<DatanodeStorageInfo> results, - boolean avoidStaleNodes, - StorageType storageType, - boolean fallbackToLocalRack) + Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, + List<DatanodeStorageInfo> results, boolean avoidStaleNodes, + EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first if (excludedNodes.add(localMachine)) { // was not in the excluded list - for(DatanodeStorageInfo localStorage : DFSUtil.shuffle( - localDatanode.getStorageInfos())) { - if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) { - return localStorage; + for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes + .entrySet().iterator(); iter.hasNext(); ) { + Map.Entry<StorageType, Integer> entry = iter.next(); + for (DatanodeStorageInfo localStorage : DFSUtil.shuffle( + localDatanode.getStorageInfos())) { + StorageType type = entry.getKey(); + if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) { + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + return localStorage; + } } } } @@ -372,7 +436,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } /** @@ -395,50 +459,71 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return the chosen node */ protected DatanodeStorageInfo chooseLocalRack(Node localMachine, - Set<Node> excludedNodes, - long blocksize, - int maxNodesPerRack, - List<DatanodeStorageInfo> results, - boolean avoidStaleNodes, - StorageType storageType) + Set<Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeStorageInfo> results, + boolean avoidStaleNodes, + EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } + final String localRack = localMachine.getNetworkLocation(); - // choose one from the local rack try { - return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); - } catch (NotEnoughReplicasException e1) { - // find the second replica - DatanodeDescriptor newLocal=null; + // choose one from the local rack + return chooseRandom(localRack, excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); + } catch (NotEnoughReplicasException e) { + // find the next replica and retry with its rack for(DatanodeStorageInfo resultStorage : results) { DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor(); if (nextNode != localMachine) { - newLocal = nextNode; - break; + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose from local rack (location = " + localRack + + "), retry with the rack of the next replica (location = " + + nextNode.getNetworkLocation() + ")", e); + } + return chooseFromNextRack(nextNode, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } - if (newLocal != null) { - try { - return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); - } catch(NotEnoughReplicasException e2) { - //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); - } - } else { - //otherwise randomly choose one from the network - return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose from local rack (location = " + localRack + + "); the second replica is not found, retry choosing ramdomly", e); } + //the second replica is not found, randomly choose one from the network + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } - + + private DatanodeStorageInfo chooseFromNextRack(Node next, + Set<Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeStorageInfo> results, + boolean avoidStaleNodes, + EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { + final String nextRack = next.getNetworkLocation(); + try { + return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack, + results, avoidStaleNodes, storageTypes); + } catch(NotEnoughReplicasException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose from the next rack (location = " + nextRack + + "), retry choosing ramdomly", e); + } + //otherwise randomly choose one from the network + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + } + } + /** * Choose <i>numOfReplicas</i> nodes from the racks * that <i>localMachine</i> is NOT on. @@ -453,18 +538,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxReplicasPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) + EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(), excludedNodes, blocksize, maxReplicasPerRack, results, - avoidStaleNodes, storageType); + avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to choose remote rack (location = ~" + + localMachine.getNetworkLocation() + "), fallback to local rack", e); + } chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), localMachine.getNetworkLocation(), excludedNodes, blocksize, - maxReplicasPerRack, results, avoidStaleNodes, storageType); + maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } } @@ -478,10 +567,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) + EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); } /** @@ -495,8 +584,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) - throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes) + throws NotEnoughReplicasException { int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( scope, excludedNodes); @@ -512,24 +601,43 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { DatanodeDescriptor chosenNode = (DatanodeDescriptor)clusterMap.chooseRandom(scope); if (excludedNodes.add(chosenNode)) { //was not in the excluded list + if (LOG.isDebugEnabled()) { + builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" ["); + } numOfAvailableNodes--; final DatanodeStorageInfo[] storages = DFSUtil.shuffle( chosenNode.getStorageInfos()); - int i; - for(i = 0; i < storages.length; i++) { - final int newExcludedNodes = addIfIsGoodTarget(storages[i], - excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, - avoidStaleNodes, storageType); - if (newExcludedNodes >= 0) { - numOfReplicas--; - if (firstChosen == null) { - firstChosen = storages[i]; + int i = 0; + boolean search = true; + for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes + .entrySet().iterator(); search && iter.hasNext(); ) { + Map.Entry<StorageType, Integer> entry = iter.next(); + for (i = 0; i < storages.length; i++) { + StorageType type = entry.getKey(); + final int newExcludedNodes = addIfIsGoodTarget(storages[i], + excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, + avoidStaleNodes, type); + if (newExcludedNodes >= 0) { + numOfReplicas--; + if (firstChosen == null) { + firstChosen = storages[i]; + } + numOfAvailableNodes -= newExcludedNodes; + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + search = false; + break; } - numOfAvailableNodes -= newExcludedNodes; - break; } } + if (LOG.isDebugEnabled()) { + builder.append("\n]"); + } // If no candidate storage was found on this DN then set badTarget. badTarget = (i == storages.length); @@ -540,9 +648,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { String detail = enableDebugLogging; if (LOG.isDebugEnabled()) { if (badTarget && builder != null) { - detail = builder.append("]").toString(); + detail = builder.toString(); builder.setLength(0); - } else detail = ""; + } else { + detail = ""; + } } throw new NotEnoughReplicasException(detail); } @@ -576,14 +686,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) { if (LOG.isDebugEnabled()) { - final DatanodeDescriptor node = storage.getDatanodeDescriptor(); // build the error message for later use. debugLoggingBuilder.get() - .append(node).append(": ") - .append("Storage ").append(storage) - .append("at node ").append(NodeBase.getPath(node)) - .append(" is not chosen because ") - .append(reason); + .append("\n Storage ").append(storage) + .append(" is not chosen since ").append(reason).append("."); } } @@ -608,11 +714,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean considerLoad, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) { - if (storage.getStorageType() != storageType) { - logNodeIsNotChosen(storage, - "storage types do not match, where the expected storage type is " - + storageType); + StorageType requiredStorageType) { + if (storage.getStorageType() != requiredStorageType) { + logNodeIsNotChosen(storage, "storage types do not match," + + " where the required storage type is " + requiredStorageType); return false; } if (storage.getState() == State.READ_ONLY_SHARED) { @@ -634,9 +739,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE; - final long scheduledSize = blockSize * node.getBlocksScheduled(); - if (requiredSize > storage.getRemaining() - scheduledSize) { - logNodeIsNotChosen(storage, "the node does not have enough space "); + final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType()); + final long remaining = node.getRemaining(storage.getStorageType()); + if (requiredSize > remaining - scheduledSize) { + logNodeIsNotChosen(storage, "the node does not have enough " + + storage.getStorageType() + " space" + + " (required=" + requiredSize + + ", scheduled=" + scheduledSize + + ", remaining=" + remaining + ")"); return false; } @@ -645,8 +755,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); final int nodeLoad = node.getXceiverCount(); if (nodeLoad > maxLoad) { - logNodeIsNotChosen(storage, - "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") "); + logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad + + " > " + maxLoad + ") "); return false; } } @@ -666,7 +776,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } return true; } - + /** * Return a pipeline of nodes. * The pipeline is formed finding a shortest path that @@ -732,7 +842,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc, Block block, short replicationFactor, Collection<DatanodeStorageInfo> first, - Collection<DatanodeStorageInfo> second) { + Collection<DatanodeStorageInfo> second, + final List<StorageType> excessTypes) { long oldestHeartbeat = now() - heartbeatInterval * tolerateHeartbeatMultiplier; DatanodeStorageInfo oldestHeartbeatStorage = null; @@ -742,6 +853,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) { + if (!excessTypes.contains(storage.getStorageType())) { + continue; + } + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); long free = node.getRemaining(); long lastHeartbeat = node.getLastUpdate(); @@ -755,8 +870,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } - return oldestHeartbeatStorage != null? oldestHeartbeatStorage - : minSpaceStorage; + final DatanodeStorageInfo storage; + if (oldestHeartbeatStorage != null) { + storage = oldestHeartbeatStorage; + } else if (minSpaceStorage != null) { + storage = minSpaceStorage; + } else { + return null; + } + excessTypes.remove(storage.getStorageType()); + return storage; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index 2c8c37d..8626053 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -17,12 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; @@ -69,22 +64,33 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType, boolean fallbackToLocalRack - ) throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack) + throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); // otherwise try local machine first if (localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine; if (excludedNodes.add(localMachine)) { // was not in the excluded list - for(DatanodeStorageInfo localStorage : DFSUtil.shuffle( - localDataNode.getStorageInfos())) { - if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) { - return localStorage; + for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes + .entrySet().iterator(); iter.hasNext(); ) { + Map.Entry<StorageType, Integer> entry = iter.next(); + for (DatanodeStorageInfo localStorage : DFSUtil.shuffle( + localDataNode.getStorageInfos())) { + StorageType type = entry.getKey(); + if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) { + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + return localStorage; + } } } } @@ -93,7 +99,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau // try a node on local node group DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup( (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); if (chosenStorage != null) { return chosenStorage; } @@ -103,7 +109,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } /** @return the node of the second replica */ @@ -123,18 +129,19 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected DatanodeStorageInfo chooseLocalRack(Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes) throws + NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } // choose one from the local rack, but off-nodegroup try { final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()); return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e1) { // find the second replica final DatanodeDescriptor newLocal = secondNode(localMachine, results); @@ -142,16 +149,17 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau try { return chooseRandom( clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + blocksize, maxNodesPerRack, results, avoidStaleNodes, + storageTypes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } else { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } } @@ -163,8 +171,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected void chooseRemoteRack(int numOfReplicas, DatanodeDescriptor localMachine, Set<Node> excludedNodes, long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results, - boolean avoidStaleNodes, StorageType storageType) - throws NotEnoughReplicasException { + boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) + throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); final String rackLocation = NetworkTopology.getFirstHalf( @@ -172,12 +180,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau try { // randomly choose from remote racks chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize, - maxReplicasPerRack, results, avoidStaleNodes, storageType); + maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { // fall back to the local rack chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas), rackLocation, excludedNodes, blocksize, - maxReplicasPerRack, results, avoidStaleNodes, storageType); + maxReplicasPerRack, results, avoidStaleNodes, storageTypes); } } @@ -191,11 +199,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau NetworkTopologyWithNodeGroup clusterMap, Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + EnumMap<StorageType, Integer> storageTypes) throws + NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } // choose one from the local node group @@ -203,7 +212,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau return chooseRandom( clusterMap.getNodeGroup(localMachine.getNetworkLocation()), excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, - storageType); + storageTypes); } catch (NotEnoughReplicasException e1) { final DatanodeDescriptor newLocal = secondNode(localMachine, results); if (newLocal != null) { @@ -211,16 +220,16 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau return chooseRandom( clusterMap.getNodeGroup(newLocal.getNetworkLocation()), excludedNodes, blocksize, maxNodesPerRack, results, - avoidStaleNodes, storageType); + avoidStaleNodes, storageTypes); } catch(NotEnoughReplicasException e2) { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } else { //otherwise randomly choose one from the network return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType); + maxNodesPerRack, results, avoidStaleNodes, storageTypes); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index f1730d4..be1ff14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -28,16 +28,19 @@ import java.util.Map; import java.util.Queue; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; @@ -204,8 +207,10 @@ public class DatanodeDescriptor extends DatanodeInfo { * in case of errors (e.g. datanode does not report if an error occurs * while writing the block). */ - private int currApproxBlocksScheduled = 0; - private int prevApproxBlocksScheduled = 0; + private EnumCounters<StorageType> currApproxBlocksScheduled + = new EnumCounters<StorageType>(StorageType.class); + private EnumCounters<StorageType> prevApproxBlocksScheduled + = new EnumCounters<StorageType>(StorageType.class); private long lastBlocksScheduledRollTime = 0; private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private int volumeFailures = 0; @@ -478,23 +483,46 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * @return Approximate number of blocks currently scheduled to be written + */ + public long getRemaining(StorageType t) { + long remaining = 0; + for(DatanodeStorageInfo s : getStorageInfos()) { + if (s.getStorageType() == t) { + remaining += s.getRemaining(); + } + } + return remaining; + } + + /** + * @return Approximate number of blocks currently scheduled to be written + * to the given storage type of this datanode. + */ + public int getBlocksScheduled(StorageType t) { + return (int)(currApproxBlocksScheduled.get(t) + + prevApproxBlocksScheduled.get(t)); + } + + /** + * @return Approximate number of blocks currently scheduled to be written * to this datanode. */ public int getBlocksScheduled() { - return currApproxBlocksScheduled + prevApproxBlocksScheduled; + return (int)(currApproxBlocksScheduled.sum() + + prevApproxBlocksScheduled.sum()); } /** Increment the number of blocks scheduled. */ - void incrementBlocksScheduled() { - currApproxBlocksScheduled++; + void incrementBlocksScheduled(StorageType t) { + currApproxBlocksScheduled.add(t, 1);; } /** Decrement the number of blocks scheduled. */ - void decrementBlocksScheduled() { - if (prevApproxBlocksScheduled > 0) { - prevApproxBlocksScheduled--; - } else if (currApproxBlocksScheduled > 0) { - currApproxBlocksScheduled--; + void decrementBlocksScheduled(StorageType t) { + if (prevApproxBlocksScheduled.get(t) > 0) { + prevApproxBlocksScheduled.subtract(t, 1); + } else if (currApproxBlocksScheduled.get(t) > 0) { + currApproxBlocksScheduled.subtract(t, 1); } // its ok if both counters are zero. } @@ -502,8 +530,8 @@ public class DatanodeDescriptor extends DatanodeInfo { /** Adjusts curr and prev number of blocks scheduled every few minutes. */ private void rollBlocksScheduled(long now) { if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) { - prevApproxBlocksScheduled = currApproxBlocksScheduled; - currApproxBlocksScheduled = 0; + prevApproxBlocksScheduled.set(currApproxBlocksScheduled); + currApproxBlocksScheduled.reset(); lastBlocksScheduledRollTime = now; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 4ddb7cc..58ca2ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -109,7 +109,7 @@ public class DatanodeStorageInfo { private long capacity; private long dfsUsed; - private long remaining; + private volatile long remaining; private long blockPoolUsed; private volatile BlockInfo blockList = null; @@ -283,7 +283,7 @@ public class DatanodeStorageInfo { /** Increment the number of blocks scheduled for each given storage */ public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) { for (DatanodeStorageInfo s : storages) { - s.getDatanodeDescriptor().incrementBlocksScheduled(); + s.getDatanodeDescriptor().incrementBlocksScheduled(s.getStorageType()); } } @@ -314,6 +314,26 @@ public class DatanodeStorageInfo { false, capacity, dfsUsed, remaining, blockPoolUsed); } + static Iterable<StorageType> toStorageTypes( + final Iterable<DatanodeStorageInfo> infos) { + return new Iterable<StorageType>() { + @Override + public Iterator<StorageType> iterator() { + return new Iterator<StorageType>() { + final Iterator<DatanodeStorageInfo> i = infos.iterator(); + @Override + public boolean hasNext() {return i.hasNext();} + @Override + public StorageType next() {return i.next().getStorageType();} + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + /** @return the first {@link DatanodeStorageInfo} corresponding to * the given datanode */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a3195eb..5062270 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1930,7 +1930,9 @@ public class DataNode extends ReconfigurableBase + b + " (numBytes=" + b.getNumBytes() + ")" + ", stage=" + stage + ", clientname=" + clientname - + ", targets=" + Arrays.asList(targets)); + + ", targets=" + Arrays.asList(targets) + + ", target storage types=" + (targetStorageTypes == null ? "[]" : + Arrays.asList(targetStorageTypes))); } this.targets = targets; this.targetStorageTypes = targetStorageTypes;