Author: szetszwo Date: Mon Aug 6 12:03:13 2012 New Revision: 1369798 URL: http://svn.apache.org/viewvc?rev=1369798&view=rev Log: HDFS-385. Backport: Add support for an experimental API that allows a module external to HDFS to specify how HDFS blocks should be placed. Contributed by Sumadhur Reddy Bolli
Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java.orig hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1369798&r1=1369797&r2=1369798&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Mon Aug 6 12:03:13 2012 @@ -23,6 +23,10 @@ Release 1.2.0 - unreleased HDFS-528. Backport: Add ability for safemode to wait for a minimum number of live datanodes. (szetszwo) + HDFS-385. Backport: Add support for an experimental API that allows a + module external to HDFS to specify how HDFS blocks should be placed. + (Sumadhur Reddy Bolli via szetszwo) + IMPROVEMENTS HDFS-3515. Port HDFS-1457 to branch-1. (eli) Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1369798&r1=1369797&r2=1369798&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original) +++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Mon Aug 6 12:03:13 2012 @@ -569,7 +569,7 @@ public class NetworkTopology { * @return number of available nodes */ public int countNumOfAvailableNodes(String scope, - List<Node> excludedNodes) { + Collection<Node> excludedNodes) { boolean isExcluded=false; if (scope.startsWith("~")) { isExcluded=true; Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1369798&r1=1369797&r2=1369798&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Aug 6 12:03:13 2012 @@ -25,6 +25,7 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.lang.Class; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -61,6 +62,9 @@ import org.apache.hadoop.hdfs.security.t import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.fs.FileSystem; @@ -786,22 +790,38 @@ public class Balancer implements Tool { } } } - + + /* + * Check that this Balancer is compatible with the Block Placement Policy used + * by the Namenode. + */ + private void checkReplicationPolicyCompatibility(Configuration conf) + throws UnsupportedActionException { + if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != BlockPlacementPolicyDefault.class) { + throw new UnsupportedActionException( + "Balancer without BlockPlacementPolicyDefault"); + } + } + /** Default constructor */ - Balancer() { + Balancer() throws UnsupportedActionException { + checkReplicationPolicyCompatibility(getConf()); } - + /** Construct a balancer from the given configuration */ - Balancer(Configuration conf) { + Balancer(Configuration conf) throws UnsupportedActionException { + checkReplicationPolicyCompatibility(conf); setConf(conf); - } + } /** Construct a balancer from the given configuration and threshold */ - Balancer(Configuration conf, double threshold) { + Balancer(Configuration conf, double threshold) + throws UnsupportedActionException { + checkReplicationPolicyCompatibility(conf); setConf(conf); this.threshold = threshold; } - + /** * Run a balancer * @param args Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=1369798&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (added) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Mon Aug 6 12:03:13 2012 @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.util.ReflectionUtils; +import java.util.*; + +/** + * This interface is used for choosing the desired number of targets + * for placing block replicas. + */ +@InterfaceAudience.Private +public abstract class BlockPlacementPolicy { + + @InterfaceAudience.Private + public static class NotEnoughReplicasException extends Exception { + private static final long serialVersionUID = 1L; + NotEnoughReplicasException(String msg) { + super(msg); + } + } + + /** + * choose <i>numOfReplicas</i> data nodes for <i>writer</i> + * to re-replicate a block with size <i>blocksize</i> + * If not, return as many as we can. + * + * @param srcPath the file to which this chooseTargets is being invoked. + * @param numOfReplicas additional number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param chosenNodes datanodes that have been chosen as targets. + * @param blocksize size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as target + * and sorted as a pipeline. + */ + abstract DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + long blocksize); + + /** + * choose <i>numOfReplicas</i> data nodes for <i>writer</i> + * to re-replicate a block with size <i>blocksize</i> + * If not, return as many as we can. + * + * @param srcPath the file to which this chooseTargets is being invoked. + * @param numOfReplicas additional number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param chosenNodes datanodes that have been chosen as targets. + * @param excludedNodes: datanodes that should not be considered as targets. + * @param blocksize size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as target + * and sorted as a pipeline. + */ + public abstract DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + HashMap<Node, Node> excludedNodes, + long blocksize); + + /** + * choose <i>numOfReplicas</i> data nodes for <i>writer</i> + * If not, return as many as we can. + * The base implemenatation extracts the pathname of the file from the + * specified srcInode, but this could be a costly operation depending on the + * file system implementation. Concrete implementations of this class should + * override this method to avoid this overhead. + * + * @param srcInode The inode of the file for which chooseTarget is being invoked. + * @param numOfReplicas additional number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param chosenNodes datanodes that have been chosen as targets. + * @param blocksize size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as target + * and sorted as a pipeline. + */ + DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + long blocksize) { + return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer, + chosenNodes, blocksize); + } + + /** + * Verify that the block is replicated on at least minRacks different racks + * if there is more than minRacks rack in the system. + * + * @param srcPath the full pathname of the file to be verified + * @param lBlk block with locations + * @param minRacks number of racks the block should be replicated to + * @return the difference between the required and the actual number of racks + * the block is replicated to. + */ + abstract public int verifyBlockPlacement(String srcPath, + LocatedBlock lBlk, + int minRacks); + /** + * Decide whether deleting the specified replica of the block still makes + * the block conform to the configured block placement policy. + * + * @param srcInode The inode of the file to which the block-to-be-deleted belongs + * @param block The block to be deleted + * @param replicationFactor The required number of replicas for this block + * @param existingReplicas The replica locations of this block that are present + on at least two unique racks. + * @param moreExistingReplicas Replica locations of this block that are not + listed in the previous parameter. + * @return the replica that is the best candidate for deletion + */ + abstract public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo srcInode, + Block block, + short replicationFactor, + Collection<DatanodeDescriptor> existingReplicas, + Collection<DatanodeDescriptor> moreExistingReplicas); + + /** + * Used to setup a BlockPlacementPolicy object. This should be defined by + * all implementations of a BlockPlacementPolicy. + * + * @param conf the configuration object + * @param stats retrieve cluster status from here + * @param clusterMap cluster topology + */ + abstract protected void initialize(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap); + + /** + * Get an instance of the configured Block Placement Policy based on the + * value of the configuration paramater dfs.block.replicator.classname. + * + * @param conf the configuration to be used + * @param stats an object that is used to retrieve the load on the cluster + * @param clusterMap the network topology of the cluster + * @return an instance of BlockPlacementPolicy + */ + public static BlockPlacementPolicy getInstance(Configuration conf, + FSClusterStats stats, + NetworkTopology clusterMap) { + Class<? extends BlockPlacementPolicy> replicatorClass = + conf.getClass("dfs.block.replicator.classname", + BlockPlacementPolicyDefault.class, + BlockPlacementPolicy.class); + BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance( + replicatorClass, conf); + replicator.initialize(conf, stats, clusterMap); + return replicator; + } + + /** + * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate + * a block with size <i>blocksize</i> + * If not, return as many as we can. + * + * @param srcPath a string representation of the file for which chooseTarget is invoked + * @param numOfReplicas number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param blocksize size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as targets + * and sorted as a pipeline. + */ + DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + long blocksize) { + return chooseTarget(srcPath, numOfReplicas, writer, + new ArrayList<DatanodeDescriptor>(), + blocksize); + } + + /** + * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate + * a block with size <i>blocksize</i> + * If not, return as many as we can. + * + * @param srcPath a string representation of the file for which chooseTarget is invoked + * @param numOfReplicas number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param blocksize size of the data to be written. + * @param excludedNodes datanodes that should not be considered as targets. + * @return array of DatanodeDescriptor instances chosen as targets + * and sorted as a pipeline. + */ + public DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + HashMap<Node, Node> excludedNodes, + long blocksize) { + return chooseTarget(srcPath, numOfReplicas, writer, + new ArrayList<DatanodeDescriptor>(), + excludedNodes, + blocksize); + } + +} \ No newline at end of file Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1369798&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (added) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Mon Aug 6 12:03:13 2012 @@ -0,0 +1,516 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.commons.logging.*; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; +import java.util.*; + +/** The class is responsible for choosing the desired number of targets + * for placing block replicas. + * The replica placement strategy is that if the writer is on a datanode, + * the 1st replica is placed on the local machine, + * otherwise a random datanode. The 2nd replica is placed on a datanode + * that is on a different rack. The 3rd replica is placed on a datanode + * which is on a different node of the rack as the second replica. + */ +@InterfaceAudience.Private +public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { + private boolean considerLoad; + private NetworkTopology clusterMap; + private FSClusterStats stats; + + BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap) { + initialize(conf, stats, clusterMap); + } + + BlockPlacementPolicyDefault() { + } + + /** {@inheritDoc} */ + public void initialize(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap) { + this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true); + this.stats = stats; + this.clusterMap = clusterMap; + } + + /** {@inheritDoc} */ + public DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + long blocksize) { + return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize); + } + + /** {@inheritDoc} */ + public DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + HashMap<Node, Node> excludedNodes, + long blocksize) { + return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize); + } + + + /** {@inheritDoc} */ + @Override + public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + long blocksize) { + return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize); + } + + /** + * This is not part of the public API but is used by the unit tests. + */ + DatanodeDescriptor[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + HashMap<Node, Node> excludedNodes, + long blocksize) { + if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { + return new DatanodeDescriptor[0]; + } + + if (excludedNodes == null) { + excludedNodes = new HashMap<Node, Node>(); + } + + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = chosenNodes.size()+numOfReplicas; + if (totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; + } + + int maxNodesPerRack = + (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; + + List<DatanodeDescriptor> results = + new ArrayList<DatanodeDescriptor>(chosenNodes); + for (Node node:chosenNodes) { + excludedNodes.put(node, node); + } + + if (!clusterMap.contains(writer)) { + writer=null; + } + + DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, + excludedNodes, blocksize, maxNodesPerRack, results); + + results.removeAll(chosenNodes); + + // sorting nodes to form a pipeline + return getPipeline((writer==null)?localNode:writer, + results.toArray(new DatanodeDescriptor[results.size()])); + } + + /* choose <i>numOfReplicas</i> from all data nodes */ + private DatanodeDescriptor chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, + HashMap<Node, Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeDescriptor> results) { + + if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { + return writer; + } + + int numOfResults = results.size(); + boolean newBlock = (numOfResults==0); + if (writer == null && !newBlock) { + writer = results.get(0); + } + + try { + if (numOfResults == 0) { + writer = chooseLocalNode(writer, excludedNodes, + blocksize, maxNodesPerRack, results); + if (--numOfReplicas == 0) { + return writer; + } + } + if (numOfResults <= 1) { + chooseRemoteRack(1, results.get(0), excludedNodes, + blocksize, maxNodesPerRack, results); + if (--numOfReplicas == 0) { + return writer; + } + } + if (numOfResults <= 2) { + if (clusterMap.isOnSameRack(results.get(0), results.get(1))) { + chooseRemoteRack(1, results.get(0), excludedNodes, + blocksize, maxNodesPerRack, results); + } else if (newBlock){ + chooseLocalRack(results.get(1), excludedNodes, blocksize, + maxNodesPerRack, results); + } else { + chooseLocalRack(writer, excludedNodes, blocksize, + maxNodesPerRack, results); + } + if (--numOfReplicas == 0) { + return writer; + } + } + chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, + blocksize, maxNodesPerRack, results); + } catch (NotEnoughReplicasException e) { + FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of " + + numOfReplicas); + } + return writer; + } + + /* choose <i>localMachine</i> as the target. + * if <i>localMachine</i> is not available, + * choose a node on the same rack + * @return the chosen node + */ + private DatanodeDescriptor chooseLocalNode( + DatanodeDescriptor localMachine, + HashMap<Node, Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeDescriptor> results) + throws NotEnoughReplicasException { + // if no local machine, randomly choose one node + if (localMachine == null) + return chooseRandom(NodeBase.ROOT, excludedNodes, + blocksize, maxNodesPerRack, results); + + // otherwise try local machine first + Node oldNode = excludedNodes.put(localMachine, localMachine); + if (oldNode == null) { // was not in the excluded list + if (isGoodTarget(localMachine, blocksize, + maxNodesPerRack, false, results)) { + results.add(localMachine); + return localMachine; + } + } + + // try a node on local rack + return chooseLocalRack(localMachine, excludedNodes, + blocksize, maxNodesPerRack, results); + } + + /* choose one node from the rack that <i>localMachine</i> is on. + * if no such node is available, choose one node from the rack where + * a second replica is on. + * if still no such node is available, choose a random node + * in the cluster. + * @return the chosen node + */ + private DatanodeDescriptor chooseLocalRack( + DatanodeDescriptor localMachine, + HashMap<Node, Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeDescriptor> results) + throws NotEnoughReplicasException { + // no local machine, so choose a random machine + if (localMachine == null) { + return chooseRandom(NodeBase.ROOT, excludedNodes, + blocksize, maxNodesPerRack, results); + } + + // choose one from the local rack + try { + return chooseRandom( + localMachine.getNetworkLocation(), + excludedNodes, blocksize, maxNodesPerRack, results); + } catch (NotEnoughReplicasException e1) { + // find the second replica + DatanodeDescriptor newLocal=null; + for(Iterator<DatanodeDescriptor> iter=results.iterator(); + iter.hasNext();) { + DatanodeDescriptor nextNode = iter.next(); + if (nextNode != localMachine) { + newLocal = nextNode; + break; + } + } + if (newLocal != null) { + try { + return chooseRandom( + newLocal.getNetworkLocation(), + excludedNodes, blocksize, maxNodesPerRack, results); + } catch(NotEnoughReplicasException e2) { + //otherwise randomly choose one from the network + return chooseRandom(NodeBase.ROOT, excludedNodes, + blocksize, maxNodesPerRack, results); + } + } else { + //otherwise randomly choose one from the network + return chooseRandom(NodeBase.ROOT, excludedNodes, + blocksize, maxNodesPerRack, results); + } + } + } + + /* choose <i>numOfReplicas</i> nodes from the racks + * that <i>localMachine</i> is NOT on. + * if not enough nodes are available, choose the remaining ones + * from the local rack + */ + + private void chooseRemoteRack(int numOfReplicas, + DatanodeDescriptor localMachine, + HashMap<Node, Node> excludedNodes, + long blocksize, + int maxReplicasPerRack, + List<DatanodeDescriptor> results) + throws NotEnoughReplicasException { + int oldNumOfReplicas = results.size(); + // randomly choose one node from remote racks + try { + chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(), + excludedNodes, blocksize, maxReplicasPerRack, results); + } catch (NotEnoughReplicasException e) { + chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), + localMachine.getNetworkLocation(), excludedNodes, blocksize, + maxReplicasPerRack, results); + } + } + + /* Randomly choose one target from <i>nodes</i>. + * @return the chosen node + */ + private DatanodeDescriptor chooseRandom( + String nodes, + HashMap<Node, Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeDescriptor> results) + throws NotEnoughReplicasException { + int numOfAvailableNodes = + clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); + while(numOfAvailableNodes > 0) { + DatanodeDescriptor chosenNode = + (DatanodeDescriptor)(clusterMap.chooseRandom(nodes)); + + Node oldNode = excludedNodes.put(chosenNode, chosenNode); + if (oldNode == null) { // choosendNode was not in the excluded list + numOfAvailableNodes--; + if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) { + results.add(chosenNode); + return chosenNode; + } + } + } + + throw new NotEnoughReplicasException( + "Not able to place enough replicas"); + } + + /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>. + */ + private void chooseRandom(int numOfReplicas, + String nodes, + HashMap<Node, Node> excludedNodes, + long blocksize, + int maxNodesPerRack, + List<DatanodeDescriptor> results) + throws NotEnoughReplicasException { + + int numOfAvailableNodes = + clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); + while(numOfReplicas > 0 && numOfAvailableNodes > 0) { + DatanodeDescriptor chosenNode = + (DatanodeDescriptor)(clusterMap.chooseRandom(nodes)); + Node oldNode = excludedNodes.put(chosenNode, chosenNode); + if (oldNode == null) { + numOfAvailableNodes--; + + if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) { + numOfReplicas--; + results.add(chosenNode); + } + } + } + + if (numOfReplicas>0) { + throw new NotEnoughReplicasException( + "Not able to place enough replicas"); + } + } + + /* judge if a node is a good target. + * return true if <i>node</i> has enough space, + * does not have too much load, and the rack does not have too many nodes + */ + private boolean isGoodTarget(DatanodeDescriptor node, + long blockSize, int maxTargetPerLoc, + List<DatanodeDescriptor> results) { + return isGoodTarget(node, blockSize, maxTargetPerLoc, + this.considerLoad, results); + } + + private boolean isGoodTarget(DatanodeDescriptor node, + long blockSize, int maxTargetPerLoc, + boolean considerLoad, + List<DatanodeDescriptor> results) { + Log logr = FSNamesystem.LOG; + // check if the node is (being) decommissed + if (node.isDecommissionInProgress() || node.isDecommissioned()) { + logr.debug("Node "+NodeBase.getPath(node)+ + " is not chosen because the node is (being) decommissioned"); + return false; + } + + long remaining = node.getRemaining() - + (node.getBlocksScheduled() * blockSize); + // check the remaining capacity of the target machine + if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) { + logr.debug("Node "+NodeBase.getPath(node)+ + " is not chosen because the node does not have enough space"); + return false; + } + + // check the communication traffic of the target machine + if (considerLoad) { + double avgLoad = 0; + int size = clusterMap.getNumOfLeaves(); + if (size != 0 && stats != null) { + avgLoad = (double)stats.getTotalLoad()/size; + } + if (node.getXceiverCount() > (2.0 * avgLoad)) { + logr.debug("Node "+NodeBase.getPath(node)+ + " is not chosen because the node is too busy"); + return false; + } + } + + // check if the target rack has chosen too many nodes + String rackname = node.getNetworkLocation(); + int counter=1; + for(Iterator<DatanodeDescriptor> iter = results.iterator(); + iter.hasNext();) { + Node result = iter.next(); + if (rackname.equals(result.getNetworkLocation())) { + counter++; + } + } + if (counter>maxTargetPerLoc) { + logr.debug("Node "+NodeBase.getPath(node)+ + " is not chosen because the rack has too many chosen nodes"); + return false; + } + return true; + } + + /* Return a pipeline of nodes. + * The pipeline is formed finding a shortest path that + * starts from the writer and traverses all <i>nodes</i> + * This is basically a traveling salesman problem. + */ + private DatanodeDescriptor[] getPipeline( + DatanodeDescriptor writer, + DatanodeDescriptor[] nodes) { + if (nodes.length==0) return nodes; + + synchronized(clusterMap) { + int index=0; + if (writer == null || !clusterMap.contains(writer)) { + writer = nodes[0]; + } + for(;index<nodes.length; index++) { + DatanodeDescriptor shortestNode = nodes[index]; + int shortestDistance = clusterMap.getDistance(writer, shortestNode); + int shortestIndex = index; + for(int i=index+1; i<nodes.length; i++) { + DatanodeDescriptor currentNode = nodes[i]; + int currentDistance = clusterMap.getDistance(writer, currentNode); + if (shortestDistance>currentDistance) { + shortestDistance = currentDistance; + shortestNode = currentNode; + shortestIndex = i; + } + } + //switch position index & shortestIndex + if (index != shortestIndex) { + nodes[shortestIndex] = nodes[index]; + nodes[index] = shortestNode; + } + writer = shortestNode; + } + } + return nodes; + } + + /** {@inheritDoc} */ + public int verifyBlockPlacement(String srcPath, + LocatedBlock lBlk, + int minRacks) { + DatanodeInfo[] locs = lBlk.getLocations(); + if (locs == null) + locs = new DatanodeInfo[0]; + int numRacks = clusterMap.getNumOfRacks(); + if(numRacks <= 1) // only one rack + return 0; + minRacks = Math.min(minRacks, numRacks); + // 1. Check that all locations are different. + // 2. Count locations on different racks. + Set<String> racks = new TreeSet<String>(); + for (DatanodeInfo dn : locs) + racks.add(dn.getNetworkLocation()); + return minRacks - racks.size(); + } + + /** {@inheritDoc} */ + public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode, + Block block, + short replicationFactor, + Collection<DatanodeDescriptor> first, + Collection<DatanodeDescriptor> second) { + long minSpace = Long.MAX_VALUE; + DatanodeDescriptor cur = null; + + // pick replica from the first Set. If first is empty, then pick replicas + // from second set. + Iterator<DatanodeDescriptor> iter = + first.isEmpty() ? second.iterator() : first.iterator(); + + // pick node with least free space + while (iter.hasNext() ) { + DatanodeDescriptor node = iter.next(); + long free = node.getRemaining(); + if (minSpace > free) { + minSpace = free; + cur = node; + } + } + return cur; + } +} + Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1369798&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (added) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Mon Aug 6 12:03:13 2012 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +/** + * This interface is used for retrieving the load related statistics of + * the cluster. + */ +public interface FSClusterStats { + + /** + * an indication of the total load of the cluster. + * + * @return a count of the total number of block transfers and block + * writes that are currently occuring on the cluster. + */ + + public int getTotalLoad() ; +} + \ No newline at end of file Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java?rev=1369798&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java (added) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java Mon Aug 6 12:03:13 2012 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +/** + * This interface is used used the pluggable block placement policy + * to expose a few characteristics of an Inode. + */ +public interface FSInodeInfo { + + /** + * a string representation of an inode + * + * @return the full pathname (from root) that this inode represents + */ + + public String getFullPathName() ; +} + Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1369798&r1=1369797&r2=1369798&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Aug 6 12:03:13 2012 @@ -116,6 +116,7 @@ import org.apache.hadoop.net.CachedDNSTo import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -143,7 +144,7 @@ import org.mortbay.util.ajax.JSON; * 4) machine --> blocklist (inverted #2) * 5) LRU cache of updated-heartbeat machines ***************************************************/ -public class FSNamesystem implements FSConstants, FSNamesystemMBean, +public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats, NameNodeMXBean, MetricsSource { public static final Log LOG = LogFactory.getLog(FSNamesystem.class); public static final String AUDIT_FORMAT = @@ -341,7 +342,7 @@ public class FSNamesystem implements FSC private DNSToSwitchMapping dnsToSwitchMapping; // for block replicas placement - ReplicationTargetChooser replicator; + BlockPlacementPolicy replicator; private HostsFileReader hostsReader; private Daemon dnthread = null; @@ -499,11 +500,7 @@ public class FSNamesystem implements FSC this.defaultPermission = PermissionStatus.createImmutable( fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission)); - - this.replicator = new ReplicationTargetChooser( - conf.getBoolean("dfs.replication.considerLoad", true), - this, - clusterMap); + this.replicator = BlockPlacementPolicy.getInstance(conf, this, clusterMap); this.defaultReplication = conf.getInt("dfs.replication", 3); this.maxReplication = conf.getInt("dfs.replication.max", 512); this.minReplication = conf.getInt("dfs.replication.min", 1); @@ -1559,7 +1556,7 @@ public class FSNamesystem implements FSC */ public LocatedBlock getAdditionalBlock(String src, String clientName, - List<Node> excludedNodes + HashMap<Node, Node> excludedNodes ) throws IOException { long fileLength, blockSize; int replication; @@ -1588,7 +1585,8 @@ public class FSNamesystem implements FSC } // choose targets for the new block tobe allocated. - DatanodeDescriptor targets[] = replicator.chooseTarget(replication, + DatanodeDescriptor targets[] = replicator.chooseTarget(src, + replication, clientNode, excludedNodes, blockSize); @@ -3020,10 +3018,11 @@ public class FSNamesystem implements FSC List<DatanodeDescriptor> containingNodes; DatanodeDescriptor srcNode; + INodeFile fileINode = null; synchronized (this) { synchronized (neededReplications) { // block should belong to a file - INodeFile fileINode = blocksMap.getINode(block); + fileINode = blocksMap.getINode(block); // abandoned block or block reopened for append if(fileINode == null || fileINode.isUnderConstruction()) { neededReplications.remove(block, priority); // remove from neededReplications @@ -3058,9 +3057,11 @@ public class FSNamesystem implements FSC } // choose replication targets: NOT HOLDING THE GLOBAL LOCK - DatanodeDescriptor targets[] = replicator.chooseTarget( + // It is costly to extract the filename for which chooseTargets is called, + // so for now we pass in the Inode itself. + DatanodeDescriptor targets[] = replicator.chooseTarget(fileINode, requiredReplication - numEffectiveReplicas, - srcNode, containingNodes, null, block.getNumBytes()); + srcNode, containingNodes, block.getNumBytes()); if(targets.length == 0) return false; @@ -3068,7 +3069,7 @@ public class FSNamesystem implements FSC synchronized (neededReplications) { // Recheck since global lock was released // block should belong to a file - INodeFile fileINode = blocksMap.getINode(block); + fileINode = blocksMap.getINode(block); // abandoned block or block reopened for append if(fileINode == null || fileINode.isUnderConstruction()) { neededReplications.remove(block, priority); // remove from neededReplications @@ -3131,12 +3132,13 @@ public class FSNamesystem implements FSC } /** Choose a datanode near to the given address. */ - public DatanodeInfo chooseDatanode(String address, long blocksize) { + public DatanodeInfo chooseDatanode(String srcPath, String address, long blocksize) { final DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost( address); if (clientNode != null) { + HashMap<Node,Node> excludedNodes = null; final DatanodeDescriptor[] datanodes = replicator.chooseTarget( - 1, clientNode, null, blocksize); + srcPath, 1, clientNode, excludedNodes, blocksize); if (datanodes.length > 0) { return datanodes[0]; } @@ -3925,6 +3927,7 @@ public class FSNamesystem implements FSC Block b, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { + INodeFile inode = blocksMap.getINode(b); // first form a rack to datanodes map and HashMap<String, ArrayList<DatanodeDescriptor>> rackMap = new HashMap<String, ArrayList<DatanodeDescriptor>>(); @@ -3962,24 +3965,13 @@ public class FSNamesystem implements FSC boolean firstOne = true; while (nonExcess.size() - replication > 0) { DatanodeInfo cur = null; - long minSpace = Long.MAX_VALUE; // check if we can del delNodeHint if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) && (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) { cur = delNodeHint; } else { // regular excessive replica removal - Iterator<DatanodeDescriptor> iter = - priSet.isEmpty() ? remains.iterator() : priSet.iterator(); - while( iter.hasNext() ) { - DatanodeDescriptor node = iter.next(); - long free = node.getRemaining(); - - if (minSpace > free) { - minSpace = free; - cur = node; - } - } + cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains); } firstOne = false; @@ -4716,7 +4708,7 @@ public class FSNamesystem implements FSC } public DatanodeDescriptor getRandomDatanode() { - return replicator.chooseTarget(1, null, null, 0)[0]; + return (DatanodeDescriptor)clusterMap.chooseRandom(NodeBase.ROOT); } /**