Author: szetszwo Date: Fri Aug 1 01:05:33 2014 New Revision: 1615015 URL: http://svn.apache.org/r1615015 Log: HDFS-6685. Balancer should preserve storage type of replicas.
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 1 01:05:33 2014 @@ -338,6 +338,8 @@ Release 2.6.0 - UNRELEASED HDFS-6441. Add ability to exclude/include specific datanodes while balancing. (Benoy Antony and Yu Li via Arpit Agarwal) + HDFS-6685. Balancer should preserve storage type of replicas. (szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java Fri Aug 1 01:05:33 2014 @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs; +import java.util.Arrays; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -32,4 +35,11 @@ public enum StorageType { SSD; public static final StorageType DEFAULT = DISK; + public static final StorageType[] EMPTY_ARRAY = {}; + + private static final StorageType[] VALUES = values(); + + public static List<StorageType> asList() { + return Arrays.asList(VALUES); + } } \ No newline at end of file Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Aug 1 01:05:33 2014 @@ -352,15 +352,19 @@ public class PBHelper { return BlockWithLocationsProto.newBuilder() .setBlock(convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) - .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build(); + .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) + .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())) + .build(); } public static BlockWithLocations convert(BlockWithLocationsProto b) { final List<String> datanodeUuids = b.getDatanodeUuidsList(); final List<String> storageUuids = b.getStorageUuidsList(); + final List<StorageTypeProto> storageTypes = b.getStorageTypesList(); return new BlockWithLocations(convert(b.getBlock()), datanodeUuids.toArray(new String[datanodeUuids.size()]), - storageUuids.toArray(new String[storageUuids.size()])); + storageUuids.toArray(new String[storageUuids.size()]), + convertStorageTypes(storageTypes, storageUuids.size())); } public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug 1 01:05:33 2014 @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.EnumMap; import java.util.Formatter; import java.util.HashMap; import java.util.HashSet; @@ -79,6 +80,8 @@ import org.apache.hadoop.hdfs.server.blo import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; @@ -90,6 +93,8 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.google.common.base.Preconditions; + /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster * when some datanodes become full or when new empty nodes join the cluster. * The tool is deployed as an application program that can be run by the @@ -190,7 +195,9 @@ import org.apache.hadoop.util.ToolRunner @InterfaceAudience.Private public class Balancer { static final Log LOG = LogFactory.getLog(Balancer.class); - final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB + final private static long GB = 1L << 30; //1GB + final private static long MAX_SIZE_TO_MOVE = 10*GB; + final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; private static long WIN_WIDTH = 5400*1000L; // 1.5 hour /** The maximum number of concurrent blocks moves for @@ -221,26 +228,22 @@ public class Balancer { Set<String> nodesToBeIncluded; // all data node lists - private final Collection<Source> overUtilizedDatanodes - = new LinkedList<Source>(); - private final Collection<Source> aboveAvgUtilizedDatanodes - = new LinkedList<Source>(); - private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes - = new LinkedList<BalancerDatanode>(); - private final Collection<BalancerDatanode> underUtilizedDatanodes - = new LinkedList<BalancerDatanode>(); - - private final Collection<Source> sources - = new HashSet<Source>(); - private final Collection<BalancerDatanode> targets - = new HashSet<BalancerDatanode>(); + private final Collection<Source> overUtilized = new LinkedList<Source>(); + private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>(); + private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized + = new LinkedList<BalancerDatanode.StorageGroup>(); + private final Collection<BalancerDatanode.StorageGroup> underUtilized + = new LinkedList<BalancerDatanode.StorageGroup>(); + + private final Collection<Source> sources = new HashSet<Source>(); + private final Collection<BalancerDatanode.StorageGroup> targets + = new HashSet<BalancerDatanode.StorageGroup>(); private final Map<Block, BalancerBlock> globalBlockList = new HashMap<Block, BalancerBlock>(); private final MovedBlocks movedBlocks = new MovedBlocks(); - /** Map (datanodeUuid -> BalancerDatanodes) */ - private final Map<String, BalancerDatanode> datanodeMap - = new HashMap<String, BalancerDatanode>(); + /** Map (datanodeUuid,storageType -> StorageGroup) */ + private final StorageGroupMap storageGroupMap = new StorageGroupMap(); private NetworkTopology cluster; @@ -248,12 +251,39 @@ public class Balancer { private final ExecutorService dispatcherExecutor; private final int maxConcurrentMovesPerNode; + + private static class StorageGroupMap { + private static String toKey(String datanodeUuid, StorageType storageType) { + return datanodeUuid + ":" + storageType; + } + + private final Map<String, BalancerDatanode.StorageGroup> map + = new HashMap<String, BalancerDatanode.StorageGroup>(); + + BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) { + return map.get(toKey(datanodeUuid, storageType)); + } + + void put(BalancerDatanode.StorageGroup g) { + final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); + final BalancerDatanode.StorageGroup existing = map.put(key, g); + Preconditions.checkState(existing == null); + } + + int size() { + return map.size(); + } + + void clear() { + map.clear(); + } + } /* This class keeps track of a scheduled block move */ private class PendingBlockMove { private BalancerBlock block; private Source source; private BalancerDatanode proxySource; - private BalancerDatanode target; + private BalancerDatanode.StorageGroup target; /** constructor */ private PendingBlockMove() { @@ -264,7 +294,7 @@ public class Balancer { final Block b = block.getBlock(); return b + " with size=" + b.getNumBytes() + " from " + source.getDisplayName() + " to " + target.getDisplayName() - + " through " + proxySource.getDisplayName(); + + " through " + proxySource.datanode; } /* choose a block & a proxy source for this pendingMove @@ -316,20 +346,20 @@ public class Balancer { final DatanodeInfo targetDN = target.getDatanode(); // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { - for (BalancerDatanode loc : block.getLocations()) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) { return true; } } } // check if there is replica which is on the same rack with the target - for (BalancerDatanode loc : block.getLocations()) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica - for (BalancerDatanode loc : block.getLocations()) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (addTo(loc)) { return true; } @@ -337,8 +367,9 @@ public class Balancer { return false; } - // add a BalancerDatanode as proxy source for specific block movement - private boolean addTo(BalancerDatanode bdn) { + /** add to a proxy source for specific block movement */ + private boolean addTo(BalancerDatanode.StorageGroup g) { + final BalancerDatanode bdn = g.getBalancerDatanode(); if (bdn.addPendingBlock(this)) { proxySource = bdn; return true; @@ -354,7 +385,7 @@ public class Balancer { DataInputStream in = null; try { sock.connect( - NetUtils.createSocketAddr(target.datanode.getXferAddr()), + NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); /* Unfortunately we don't have a good way to know if the Datanode is * taking a really long time to move a block, OR something has @@ -371,7 +402,7 @@ public class Balancer { ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, nnc, accessToken, target.datanode); + unbufIn, nnc, accessToken, target.getDatanode()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, @@ -391,14 +422,14 @@ public class Balancer { * gets out of sync with work going on in datanode. */ proxySource.activateDelay(DELAY_AFTER_ERROR); - target.activateDelay(DELAY_AFTER_ERROR); + target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); proxySource.removePendingBlock(this); - target.removePendingBlock(this); + target.getBalancerDatanode().removePendingBlock(this); synchronized (this ) { reset(); @@ -414,7 +445,7 @@ public class Balancer { StorageType storageType, Token<BlockTokenIdentifier> accessToken) throws IOException { new Sender(out).replaceBlock(eb, storageType, accessToken, - source.getStorageID(), proxySource.getDatanode()); + source.getDatanode().getDatanodeUuid(), proxySource.datanode); } /* Receive a block copy response from the input stream */ @@ -454,8 +485,9 @@ public class Balancer { /* A class for keeping track of blocks in the Balancer */ static private class BalancerBlock { private final Block block; // the block - private final List<BalancerDatanode> locations - = new ArrayList<BalancerDatanode>(3); // its locations + /** The locations of the replicas of the block. */ + private final List<BalancerDatanode.StorageGroup> locations + = new ArrayList<BalancerDatanode.StorageGroup>(3); /* Constructor */ private BalancerBlock(Block block) { @@ -468,20 +500,19 @@ public class Balancer { } /* add a location */ - private synchronized void addLocation(BalancerDatanode datanode) { - if (!locations.contains(datanode)) { - locations.add(datanode); + private synchronized void addLocation(BalancerDatanode.StorageGroup g) { + if (!locations.contains(g)) { + locations.add(g); } } - /* Return if the block is located on <code>datanode</code> */ - private synchronized boolean isLocatedOnDatanode( - BalancerDatanode datanode) { - return locations.contains(datanode); + /** @return if the block is located on the given storage group. */ + private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) { + return locations.contains(g); } /* Return its locations */ - private synchronized List<BalancerDatanode> getLocations() { + private synchronized List<BalancerDatanode.StorageGroup> getLocations() { return locations; } @@ -498,37 +529,84 @@ public class Balancer { /* The class represents a desired move of bytes between two nodes * and the target. - * An object of this class is stored in a source node. + * An object of this class is stored in a source. */ - static private class NodeTask { - private final BalancerDatanode datanode; //target node + static private class Task { + private final BalancerDatanode.StorageGroup target; private long size; //bytes scheduled to move /* constructor */ - private NodeTask(BalancerDatanode datanode, long size) { - this.datanode = datanode; + private Task(BalancerDatanode.StorageGroup target, long size) { + this.target = target; this.size = size; } - - /* Get the node */ - private BalancerDatanode getDatanode() { - return datanode; - } - - /* Get the number of bytes that need to be moved */ - private long getSize() { - return size; - } } /* A class that keeps track of a datanode in Balancer */ private static class BalancerDatanode { - final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB + + /** A group of storages in a datanode with the same storage type. */ + private class StorageGroup { + final StorageType storageType; + final double utilization; + final long maxSize2Move; + private long scheduledSize = 0L; + + private StorageGroup(StorageType storageType, double utilization, + long maxSize2Move) { + this.storageType = storageType; + this.utilization = utilization; + this.maxSize2Move = maxSize2Move; + } + + BalancerDatanode getBalancerDatanode() { + return BalancerDatanode.this; + } + + DatanodeInfo getDatanode() { + return BalancerDatanode.this.datanode; + } + + /** Decide if still need to move more bytes */ + protected synchronized boolean hasSpaceForScheduling() { + return availableSizeToMove() > 0L; + } + + /** @return the total number of bytes that need to be moved */ + synchronized long availableSizeToMove() { + return maxSize2Move - scheduledSize; + } + + /** increment scheduled size */ + synchronized void incScheduledSize(long size) { + scheduledSize += size; + } + + /** @return scheduled size */ + synchronized long getScheduledSize() { + return scheduledSize; + } + + /** Reset scheduled size to zero. */ + synchronized void resetScheduledSize() { + scheduledSize = 0L; + } + + /** @return the name for display */ + String getDisplayName() { + return datanode + ":" + storageType; + } + + @Override + public String toString() { + return "" + utilization; + } + } + final DatanodeInfo datanode; - final double utilization; - final long maxSize2Move; - private long scheduledSize = 0L; + final EnumMap<StorageType, StorageGroup> storageMap + = new EnumMap<StorageType, StorageGroup>(StorageType.class); protected long delayUntil = 0L; // blocks being moved but not confirmed yet private final List<PendingBlockMove> pendingBlocks; @@ -536,78 +614,38 @@ public class Balancer { @Override public String toString() { - return getClass().getSimpleName() + "[" + datanode - + ", utilization=" + utilization + "]"; + return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; } /* Constructor * Depending on avgutil & threshold, calculate maximum bytes to move */ - private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold, - int maxConcurrentMoves) { - datanode = node; - utilization = policy.getUtilization(node); - final double avgUtil = policy.getAvgUtilization(); - long maxSizeToMove; - - if (utilization >= avgUtil+threshold - || utilization <= avgUtil-threshold) { - maxSizeToMove = (long)(threshold*datanode.getCapacity()/100); - } else { - maxSizeToMove = - (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100); - } - if (utilization < avgUtil ) { - maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove); - } - this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + private BalancerDatanode(DatanodeStorageReport report, + double threshold, int maxConcurrentMoves) { + this.datanode = report.getDatanodeInfo(); this.maxConcurrentMoves = maxConcurrentMoves; this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves); } - /** Get the datanode */ - protected DatanodeInfo getDatanode() { - return datanode; - } - - /** Get the name of the datanode */ - protected String getDisplayName() { - return datanode.toString(); - } - - /* Get the storage id of the datanode */ - protected String getStorageID() { - return datanode.getDatanodeUuid(); - } - - /** Decide if still need to move more bytes */ - protected synchronized boolean hasSpaceForScheduling() { - return scheduledSize<maxSize2Move; - } - - /** Return the total number of bytes that need to be moved */ - protected synchronized long availableSizeToMove() { - return maxSize2Move-scheduledSize; - } - - /** increment scheduled size */ - protected synchronized void incScheduledSize(long size) { - scheduledSize += size; - } - - /** decrement scheduled size */ - protected synchronized void decScheduledSize(long size) { - scheduledSize -= size; - } - - /** get scheduled size */ - protected synchronized long getScheduledSize(){ - return scheduledSize; - } - - /** get scheduled size */ - protected synchronized void setScheduledSize(long size){ - scheduledSize = size; + private void put(StorageType storageType, StorageGroup g) { + final StorageGroup existing = storageMap.put(storageType, g); + Preconditions.checkState(existing == null); + } + + StorageGroup addStorageGroup(StorageType storageType, double utilization, + long maxSize2Move) { + final StorageGroup g = new StorageGroup(storageType, utilization, + maxSize2Move); + put(storageType, g); + return g; + } + + Source addSource(StorageType storageType, double utilization, + long maxSize2Move, Balancer balancer) { + final Source s = balancer.new Source(storageType, utilization, + maxSize2Move, this); + put(storageType, s); + return s; } synchronized private void activateDelay(long delta) { @@ -650,9 +688,9 @@ public class Balancer { return pendingBlocks.remove(pendingBlock); } } - + /** A node that can be the sources of a block move */ - private class Source extends BalancerDatanode { + private class Source extends BalancerDatanode.StorageGroup { /* A thread that initiates a block move * and waits for block move to complete */ @@ -663,7 +701,7 @@ public class Balancer { } } - private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2); + private final List<Task> tasks = new ArrayList<Task>(2); private long blocksToReceive = 0L; /* source blocks point to balancerBlocks in the global list because * we want to keep one copy of a block in balancer and be aware that @@ -673,17 +711,17 @@ public class Balancer { = new ArrayList<BalancerBlock>(); /* constructor */ - private Source(DatanodeInfo node, BalancingPolicy policy, double threshold, - int maxConcurrentMoves) { - super(node, policy, threshold, maxConcurrentMoves); + private Source(StorageType storageType, double utilization, + long maxSize2Move, BalancerDatanode dn) { + dn.super(storageType, utilization, maxSize2Move); } - /** Add a node task */ - private void addNodeTask(NodeTask task) { - assert (task.datanode != this) : - "Source and target are the same " + datanode; - incScheduledSize(task.getSize()); - nodeTasks.add(task); + /** Add a task */ + private void addTask(Task task) { + Preconditions.checkState(task.target != this, + "Source and target are the same storage group " + getDisplayName()); + incScheduledSize(task.size); + tasks.add(task); } /* Return an iterator to this source's blocks */ @@ -696,8 +734,10 @@ public class Balancer { * Return the total size of the received blocks in the number of bytes. */ private long getBlockList() throws IOException { - BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, - Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks(); + final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); + final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks( + getDatanode(), size).getBlocks(); + long bytesReceived = 0; for (BlockWithLocations blk : newBlocks) { bytesReceived += blk.getBlock().getNumBytes(); @@ -713,10 +753,13 @@ public class Balancer { synchronized (block) { // update locations - for (String datanodeUuid : blk.getDatanodeUuids()) { - final BalancerDatanode d = datanodeMap.get(datanodeUuid); - if (d != null) { // not an unknown datanode - block.addLocation(d); + final String[] datanodeUuids = blk.getDatanodeUuids(); + final StorageType[] storageTypes = blk.getStorageTypes(); + for (int i = 0; i < datanodeUuids.length; i++) { + final BalancerDatanode.StorageGroup g = storageGroupMap.get( + datanodeUuids[i], storageTypes[i]); + if (g != null) { // not unknown + block.addLocation(g); } } } @@ -731,8 +774,8 @@ public class Balancer { /* Decide if the given block is a good candidate to move or not */ private boolean isGoodBlockCandidate(BalancerBlock block) { - for (NodeTask nodeTask : nodeTasks) { - if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) { + for (Task t : tasks) { + if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) { return true; } } @@ -747,20 +790,20 @@ public class Balancer { * The block should be dispatched immediately after this method is returned. */ private PendingBlockMove chooseNextBlockToMove() { - for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) { - NodeTask task = tasks.next(); - BalancerDatanode target = task.getDatanode(); + for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { + final Task task = i.next(); + final BalancerDatanode target = task.target.getBalancerDatanode(); PendingBlockMove pendingBlock = new PendingBlockMove(); if (target.addPendingBlock(pendingBlock)) { // target is not busy, so do a tentative block allocation pendingBlock.source = this; - pendingBlock.target = target; + pendingBlock.target = task.target; if ( pendingBlock.chooseBlockAndProxy() ) { long blockSize = pendingBlock.block.getNumBytes(); - decScheduledSize(blockSize); + incScheduledSize(-blockSize); task.size -= blockSize; if (task.size == 0) { - tasks.remove(); + i.remove(); } return pendingBlock; } else { @@ -834,7 +877,7 @@ public class Balancer { // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { - setScheduledSize(0); + resetScheduledSize(); } } @@ -901,108 +944,154 @@ public class Balancer { IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); } - /* Given a data node set, build a network topology and decide - * over-utilized datanodes, above average utilized datanodes, - * below average utilized datanodes, and underutilized datanodes. - * The input data node set is shuffled before the datanodes - * are put into the over-utilized datanodes, above average utilized - * datanodes, below average utilized datanodes, and - * underutilized datanodes lists. This will add some randomness - * to the node matching later on. - * + + private static long getCapacity(DatanodeStorageReport report, StorageType t) { + long capacity = 0L; + for(StorageReport r : report.getStorageReports()) { + if (r.getStorage().getStorageType() == t) { + capacity += r.getCapacity(); + } + } + return capacity; + } + + private static long getRemaining(DatanodeStorageReport report, StorageType t) { + long remaining = 0L; + for(StorageReport r : report.getStorageReports()) { + if (r.getStorage().getStorageType() == t) { + remaining += r.getRemaining(); + } + } + return remaining; + } + + private boolean shouldIgnore(DatanodeInfo dn) { + //ignore decommissioned nodes + final boolean decommissioned = dn.isDecommissioned(); + //ignore decommissioning nodes + final boolean decommissioning = dn.isDecommissionInProgress(); + // ignore nodes in exclude list + final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn); + // ignore nodes not in the include list (if include list is not empty) + final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn); + + if (decommissioned || decommissioning || excluded || notIncluded) { + if (LOG.isTraceEnabled()) { + LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " + + decommissioning + ", " + excluded + ", " + notIncluded); + } + return true; + } + return false; + } + + /** + * Given a datanode storage set, build a network topology and decide + * over-utilized storages, above average utilized storages, + * below average utilized storages, and underutilized storages. + * The input datanode storage set is shuffled in order to randomize + * to the storage matching later on. + * * @return the total number of bytes that are * needed to move to make the cluster balanced. - * @param datanodes a set of datanodes + * @param reports a set of datanode storage reports */ - private long initNodes(DatanodeInfo[] datanodes) { + private long init(DatanodeStorageReport[] reports) { // compute average utilization - for (DatanodeInfo datanode : datanodes) { - // ignore decommissioning or decommissioned nodes or - // ignore nodes in exclude list - // or nodes not in the include list (if include list is not empty) - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() || - Util.shouldBeExcluded(nodesToBeExcluded, datanode) || - !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) { - continue; + for (DatanodeStorageReport r : reports) { + if (shouldIgnore(r.getDatanodeInfo())) { + continue; } - policy.accumulateSpaces(datanode); + policy.accumulateSpaces(r); } policy.initAvgUtilization(); - /*create network topology and all data node lists: - * overloaded, above-average, below-average, and underloaded - * we alternates the accessing of the given datanodes array either by - * an increasing order or a decreasing order. - */ + // create network topology and classify utilization collections: + // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; - for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) { - // ignore decommissioning or decommissioned nodes or - // ignore nodes in exclude list - // or nodes not in the include list (if include list is not empty) - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() || - Util.shouldBeExcluded(nodesToBeExcluded, datanode) || - !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Excluding datanode " + datanode); - } - continue; + for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) { + final DatanodeInfo datanode = r.getDatanodeInfo(); + if (shouldIgnore(datanode)) { + continue; // ignore decommissioning or decommissioned nodes } cluster.add(datanode); - BalancerDatanode datanodeS; - final double avg = policy.getAvgUtilization(); - if (policy.getUtilization(datanode) >= avg) { - datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode); - if (isAboveAvgUtilized(datanodeS)) { - this.aboveAvgUtilizedDatanodes.add((Source)datanodeS); - } else { - assert(isOverUtilized(datanodeS)) : - datanodeS.getDisplayName()+ "is not an overUtilized node"; - this.overUtilizedDatanodes.add((Source)datanodeS); - overLoadedBytes += (long)((datanodeS.utilization-avg - -threshold)*datanodeS.datanode.getCapacity()/100.0); + + final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes, + maxConcurrentMovesPerNode); + for(StorageType t : StorageType.asList()) { + final Double utilization = policy.getUtilization(r, t); + if (utilization == null) { // datanode does not have such storage type + continue; } - } else { - datanodeS = new BalancerDatanode(datanode, policy, threshold, - maxConcurrentMovesPerNode); - if ( isBelowOrEqualAvgUtilized(datanodeS)) { - this.belowAvgUtilizedDatanodes.add(datanodeS); + + final long capacity = getCapacity(r, t); + final double utilizationDiff = utilization - policy.getAvgUtilization(t); + final double thresholdDiff = Math.abs(utilizationDiff) - threshold; + final long maxSize2Move = computeMaxSize2Move(capacity, + getRemaining(r, t), utilizationDiff, threshold); + + final BalancerDatanode.StorageGroup g; + if (utilizationDiff > 0) { + final Source s = dn.addSource(t, utilization, maxSize2Move, this); + if (thresholdDiff <= 0) { // within threshold + aboveAvgUtilized.add(s); + } else { + overLoadedBytes += precentage2bytes(thresholdDiff, capacity); + overUtilized.add(s); + } + g = s; } else { - assert isUnderUtilized(datanodeS) : "isUnderUtilized(" - + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS) - + ", utilization=" + datanodeS.utilization; - this.underUtilizedDatanodes.add(datanodeS); - underLoadedBytes += (long)((avg-threshold- - datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0); + g = dn.addStorageGroup(t, utilization, maxSize2Move); + if (thresholdDiff <= 0) { // within threshold + belowAvgUtilized.add(g); + } else { + underLoadedBytes += precentage2bytes(thresholdDiff, capacity); + underUtilized.add(g); + } } + storageGroupMap.put(g); } - datanodeMap.put(datanode.getDatanodeUuid(), datanodeS); } - //logging - logNodes(); + logUtilizationCollections(); - assert (this.datanodeMap.size() == - overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ - aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()) - : "Mismatched number of datanodes"; + Preconditions.checkState(storageGroupMap.size() == overUtilized.size() + + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(), + "Mismatched number of storage groups"); // return number of bytes to be moved in order to make the cluster balanced return Math.max(overLoadedBytes, underLoadedBytes); } + private static long computeMaxSize2Move(final long capacity, final long remaining, + final double utilizationDiff, final double threshold) { + final double diff = Math.min(threshold, Math.abs(utilizationDiff)); + long maxSizeToMove = precentage2bytes(diff, capacity); + if (utilizationDiff < 0) { + maxSizeToMove = Math.min(remaining, maxSizeToMove); + } + return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + } + + private static long precentage2bytes(double precentage, long capacity) { + Preconditions.checkArgument(precentage >= 0, + "precentage = " + precentage + " < 0"); + return (long)(precentage * capacity / 100.0); + } + /* log the over utilized & under utilized nodes */ - private void logNodes() { - logNodes("over-utilized", overUtilizedDatanodes); + private void logUtilizationCollections() { + logUtilizationCollection("over-utilized", overUtilized); if (LOG.isTraceEnabled()) { - logNodes("above-average", aboveAvgUtilizedDatanodes); - logNodes("below-average", belowAvgUtilizedDatanodes); + logUtilizationCollection("above-average", aboveAvgUtilized); + logUtilizationCollection("below-average", belowAvgUtilized); } - logNodes("underutilized", underUtilizedDatanodes); + logUtilizationCollection("underutilized", underUtilized); } - private static <T extends BalancerDatanode> void logNodes( - String name, Collection<T> nodes) { - LOG.info(nodes.size() + " " + name + ": " + nodes); + private static <T extends BalancerDatanode.StorageGroup> + void logUtilizationCollection(String name, Collection<T> items) { + LOG.info(items.size() + " " + name + ": " + items); } /** A matcher interface for matching nodes. */ @@ -1038,26 +1127,24 @@ public class Balancer { /** * Decide all <source, target> pairs and * the number of bytes to move from a source to a target - * Maximum bytes to be moved per node is - * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). - * Return total number of bytes to move in this iteration + * Maximum bytes to be moved per storage group is + * min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). + * @return total number of bytes to move in this iteration */ - private long chooseNodes() { + private long chooseStorageGroups() { // First, match nodes on the same node group if cluster is node group aware if (cluster.isNodeGroupAware()) { - chooseNodes(SAME_NODE_GROUP); + chooseStorageGroups(SAME_NODE_GROUP); } // Then, match nodes on the same rack - chooseNodes(SAME_RACK); + chooseStorageGroups(SAME_RACK); // At last, match all remaining nodes - chooseNodes(ANY_OTHER); + chooseStorageGroups(ANY_OTHER); - assert (datanodeMap.size() >= sources.size()+targets.size()) - : "Mismatched number of datanodes (" + - datanodeMap.size() + " total, " + - sources.size() + " sources, " + - targets.size() + " targets)"; + Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(), + "Mismatched number of datanodes (" + storageGroupMap.size() + " < " + + sources.size() + " sources, " + targets.size() + " targets)"); long bytesToMove = 0L; for (Source src : sources) { @@ -1067,25 +1154,25 @@ public class Balancer { } /** Decide all <source, target> pairs according to the matcher. */ - private void chooseNodes(final Matcher matcher) { + private void chooseStorageGroups(final Matcher matcher) { /* first step: match each overUtilized datanode (source) to * one or more underUtilized datanodes (targets). */ - chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher); + chooseStorageGroups(overUtilized, underUtilized, matcher); /* match each remaining overutilized datanode (source) to * below average utilized datanodes (targets). * Note only overutilized datanodes that haven't had that max bytes to move * satisfied in step 1 are selected */ - chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher); + chooseStorageGroups(overUtilized, belowAvgUtilized, matcher); /* match each remaining underutilized datanode (target) to * above average utilized datanodes (source). * Note only underutilized datanodes that have not had that max bytes to * move satisfied in step 1 are selected. */ - chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher); + chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher); } /** @@ -1093,13 +1180,14 @@ public class Balancer { * datanodes or the candidates are source nodes with (utilization > Avg), and * the others are target nodes with (utilization < Avg). */ - private <D extends BalancerDatanode, C extends BalancerDatanode> void - chooseDatanodes(Collection<D> datanodes, Collection<C> candidates, + private <G extends BalancerDatanode.StorageGroup, + C extends BalancerDatanode.StorageGroup> + void chooseStorageGroups(Collection<G> groups, Collection<C> candidates, Matcher matcher) { - for (Iterator<D> i = datanodes.iterator(); i.hasNext();) { - final D datanode = i.next(); - for(; chooseForOneDatanode(datanode, candidates, matcher); ); - if (!datanode.hasSpaceForScheduling()) { + for(final Iterator<G> i = groups.iterator(); i.hasNext();) { + final G g = i.next(); + for(; choose4One(g, candidates, matcher); ); + if (!g.hasSpaceForScheduling()) { i.remove(); } } @@ -1109,18 +1197,19 @@ public class Balancer { * For the given datanode, choose a candidate and then schedule it. * @return true if a candidate is chosen; false if no candidates is chosen. */ - private <C extends BalancerDatanode> boolean chooseForOneDatanode( - BalancerDatanode dn, Collection<C> candidates, Matcher matcher) { + private <C extends BalancerDatanode.StorageGroup> + boolean choose4One(BalancerDatanode.StorageGroup g, + Collection<C> candidates, Matcher matcher) { final Iterator<C> i = candidates.iterator(); - final C chosen = chooseCandidate(dn, i, matcher); - + final C chosen = chooseCandidate(g, i, matcher); + if (chosen == null) { return false; } - if (dn instanceof Source) { - matchSourceWithTargetToMove((Source)dn, chosen); + if (g instanceof Source) { + matchSourceWithTargetToMove((Source)g, chosen); } else { - matchSourceWithTargetToMove((Source)chosen, dn); + matchSourceWithTargetToMove((Source)chosen, g); } if (!chosen.hasSpaceForScheduling()) { i.remove(); @@ -1128,27 +1217,28 @@ public class Balancer { return true; } - private void matchSourceWithTargetToMove( - Source source, BalancerDatanode target) { + private void matchSourceWithTargetToMove(Source source, + BalancerDatanode.StorageGroup target) { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); - NodeTask nodeTask = new NodeTask(target, size); - source.addNodeTask(nodeTask); - target.incScheduledSize(nodeTask.getSize()); + final Task task = new Task(target, size); + source.addTask(task); + target.incScheduledSize(task.size); sources.add(source); targets.add(target); LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " - +source.datanode.getName() + " to " + target.datanode.getName()); + + source.getDisplayName() + " to " + target.getDisplayName()); } /** Choose a candidate for the given datanode. */ - private <D extends BalancerDatanode, C extends BalancerDatanode> - C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) { - if (dn.hasSpaceForScheduling()) { + private <G extends BalancerDatanode.StorageGroup, + C extends BalancerDatanode.StorageGroup> + C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) { + if (g.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { final C c = candidates.next(); if (!c.hasSpaceForScheduling()) { candidates.remove(); - } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) { + } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) { return c; } } @@ -1202,9 +1292,10 @@ public class Balancer { boolean shouldWait; do { shouldWait = false; - for (BalancerDatanode target : targets) { - if (!target.isPendingQEmpty()) { + for (BalancerDatanode.StorageGroup target : targets) { + if (!target.getBalancerDatanode().isPendingQEmpty()) { shouldWait = true; + break; } } if (shouldWait) { @@ -1273,12 +1364,15 @@ public class Balancer { * 3. doing the move does not reduce the number of racks that the block has */ private boolean isGoodBlockCandidate(Source source, - BalancerDatanode target, BalancerBlock block) { + BalancerDatanode.StorageGroup target, BalancerBlock block) { + if (source.storageType != target.storageType) { + return false; + } // check if the block is moved or not if (movedBlocks.contains(block)) { - return false; + return false; } - if (block.isLocatedOnDatanode(target)) { + if (block.isLocatedOn(target)) { return false; } if (cluster.isNodeGroupAware() && @@ -1293,8 +1387,8 @@ public class Balancer { } else { boolean notOnSameRack = true; synchronized (block) { - for (BalancerDatanode loc : block.locations) { - if (cluster.isOnSameRack(loc.datanode, target.datanode)) { + for (BalancerDatanode.StorageGroup loc : block.locations) { + if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { notOnSameRack = false; break; } @@ -1305,9 +1399,9 @@ public class Balancer { goodBlock = true; } else { // good if source is on the same rack as on of the replicas - for (BalancerDatanode loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.locations) { if (loc != source && - cluster.isOnSameRack(loc.datanode, source.datanode)) { + cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) { goodBlock = true; break; } @@ -1328,25 +1422,26 @@ public class Balancer { * @return true if there are any replica (other than source) on the same node * group with target */ - private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target, + private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target, BalancerBlock block, Source source) { - for (BalancerDatanode loc : block.locations) { + final DatanodeInfo targetDn = target.getDatanode(); + for (BalancerDatanode.StorageGroup loc : block.locations) { if (loc != source && - cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) { - return true; - } + cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) { + return true; } + } return false; } /* reset all fields in a balancer preparing for the next iteration */ private void resetData(Configuration conf) { this.cluster = NetworkTopology.getInstance(conf); - this.overUtilizedDatanodes.clear(); - this.aboveAvgUtilizedDatanodes.clear(); - this.belowAvgUtilizedDatanodes.clear(); - this.underUtilizedDatanodes.clear(); - this.datanodeMap.clear(); + this.overUtilized.clear(); + this.aboveAvgUtilized.clear(); + this.belowAvgUtilized.clear(); + this.underUtilized.clear(); + this.storageGroupMap.clear(); this.sources.clear(); this.targets.clear(); this.policy.reset(); @@ -1366,32 +1461,6 @@ public class Balancer { } } } - - /* Return true if the given datanode is overUtilized */ - private boolean isOverUtilized(BalancerDatanode datanode) { - return datanode.utilization > (policy.getAvgUtilization()+threshold); - } - - /* Return true if the given datanode is above or equal to average utilized - * but not overUtilized */ - private boolean isAboveAvgUtilized(BalancerDatanode datanode) { - final double avg = policy.getAvgUtilization(); - return (datanode.utilization <= (avg+threshold)) - && (datanode.utilization >= avg); - } - - /* Return true if the given datanode is underUtilized */ - private boolean isUnderUtilized(BalancerDatanode datanode) { - return datanode.utilization < (policy.getAvgUtilization()-threshold); - } - - /* Return true if the given datanode is below average utilized - * but not underUtilized */ - private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) { - final double avg = policy.getAvgUtilization(); - return (datanode.utilization >= (avg-threshold)) - && (datanode.utilization <= avg); - } // Exit status enum ReturnStatus { @@ -1419,7 +1488,8 @@ public class Balancer { /* get all live datanodes of a cluster and their disk usage * decide the number of bytes need to be moved */ - final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE)); + final long bytesLeftToMove = init( + nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE)); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); return ReturnStatus.SUCCESS; @@ -1433,7 +1503,7 @@ public class Balancer { * in this iteration. Maximum bytes to be moved per node is * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). */ - final long bytesToMove = chooseNodes(); + final long bytesToMove = chooseStorageGroups(); if (bytesToMove == 0) { System.out.println("No block can be moved. Exiting..."); return ReturnStatus.NO_MOVE_BLOCK; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Fri Aug 1 01:05:33 2014 @@ -18,7 +18,11 @@ package org.apache.hadoop.hdfs.server.balancer; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.EnumCounters; +import org.apache.hadoop.hdfs.util.EnumDoubles; /** * Balancing policy. @@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D */ @InterfaceAudience.Private abstract class BalancingPolicy { - long totalCapacity; - long totalUsedSpace; - private double avgUtilization; + final EnumCounters<StorageType> totalCapacities + = new EnumCounters<StorageType>(StorageType.class); + final EnumCounters<StorageType> totalUsedSpaces + = new EnumCounters<StorageType>(StorageType.class); + final EnumDoubles<StorageType> avgUtilizations + = new EnumDoubles<StorageType>(StorageType.class); void reset() { - totalCapacity = 0L; - totalUsedSpace = 0L; - avgUtilization = 0.0; + totalCapacities.reset(); + totalUsedSpaces.reset(); + avgUtilizations.reset(); } /** Get the policy name. */ abstract String getName(); /** Accumulate used space and capacity. */ - abstract void accumulateSpaces(DatanodeInfo d); + abstract void accumulateSpaces(DatanodeStorageReport r); void initAvgUtilization() { - this.avgUtilization = totalUsedSpace*100.0/totalCapacity; + for(StorageType t : StorageType.asList()) { + final long capacity = totalCapacities.get(t); + if (capacity > 0L) { + final double avg = totalUsedSpaces.get(t)*100.0/capacity; + avgUtilizations.set(t, avg); + } + } } - double getAvgUtilization() { - return avgUtilization; + + double getAvgUtilization(StorageType t) { + return avgUtilizations.get(t); } - /** Return the utilization of a datanode */ - abstract double getUtilization(DatanodeInfo d); + /** @return the utilization of a particular storage type of a datanode; + * or return null if the datanode does not have such storage type. + */ + abstract Double getUtilization(DatanodeStorageReport r, StorageType t); @Override public String toString() { @@ -84,14 +100,25 @@ abstract class BalancingPolicy { } @Override - void accumulateSpaces(DatanodeInfo d) { - totalCapacity += d.getCapacity(); - totalUsedSpace += d.getDfsUsed(); + void accumulateSpaces(DatanodeStorageReport r) { + for(StorageReport s : r.getStorageReports()) { + final StorageType t = s.getStorage().getStorageType(); + totalCapacities.add(t, s.getCapacity()); + totalUsedSpaces.add(t, s.getDfsUsed()); + } } @Override - double getUtilization(DatanodeInfo d) { - return d.getDfsUsed()*100.0/d.getCapacity(); + Double getUtilization(DatanodeStorageReport r, final StorageType t) { + long capacity = 0L; + long dfsUsed = 0L; + for(StorageReport s : r.getStorageReports()) { + if (s.getStorage().getStorageType() == t) { + capacity += s.getCapacity(); + dfsUsed += s.getDfsUsed(); + } + } + return capacity == 0L? null: dfsUsed*100.0/capacity; } } @@ -108,14 +135,25 @@ abstract class BalancingPolicy { } @Override - void accumulateSpaces(DatanodeInfo d) { - totalCapacity += d.getCapacity(); - totalUsedSpace += d.getBlockPoolUsed(); + void accumulateSpaces(DatanodeStorageReport r) { + for(StorageReport s : r.getStorageReports()) { + final StorageType t = s.getStorage().getStorageType(); + totalCapacities.add(t, s.getCapacity()); + totalUsedSpaces.add(t, s.getBlockPoolUsed()); + } } @Override - double getUtilization(DatanodeInfo d) { - return d.getBlockPoolUsed()*100.0/d.getCapacity(); + Double getUtilization(DatanodeStorageReport r, final StorageType t) { + long capacity = 0L; + long blockPoolUsed = 0L; + for(StorageReport s : r.getStorageReports()) { + if (s.getStorage().getStorageType() == t) { + capacity += s.getCapacity(); + blockPoolUsed += s.getBlockPoolUsed(); + } + } + return capacity == 0L? null: blockPoolUsed*100.0/capacity; } } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Aug 1 01:05:33 2014 @@ -2826,12 +2826,15 @@ public class BlockManager { } else { final String[] datanodeUuids = new String[locations.size()]; final String[] storageIDs = new String[datanodeUuids.length]; + final StorageType[] storageTypes = new StorageType[datanodeUuids.length]; for(int i = 0; i < locations.size(); i++) { final DatanodeStorageInfo s = locations.get(i); datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid(); storageIDs[i] = s.getStorageID(); + storageTypes[i] = s.getStorageType(); } - results.add(new BlockWithLocations(block, datanodeUuids, storageIDs)); + results.add(new BlockWithLocations(block, datanodeUuids, storageIDs, + storageTypes)); return block.getNumBytes(); } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java Fri Aug 1 01:05:33 2014 @@ -17,10 +17,9 @@ */ package org.apache.hadoop.hdfs.server.protocol; -import java.util.Arrays; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; /** @@ -39,12 +38,15 @@ public class BlocksWithLocations { final Block block; final String[] datanodeUuids; final String[] storageIDs; + final StorageType[] storageTypes; /** constructor */ - public BlockWithLocations(Block block, String[] datanodeUuids, String[] storageIDs) { + public BlockWithLocations(Block block, String[] datanodeUuids, + String[] storageIDs, StorageType[] storageTypes) { this.block = block; this.datanodeUuids = datanodeUuids; this.storageIDs = storageIDs; + this.storageTypes = storageTypes; } /** get the block */ @@ -61,7 +63,12 @@ public class BlocksWithLocations { public String[] getStorageIDs() { return storageIDs; } - + + /** @return the storage types */ + public StorageType[] getStorageTypes() { + return storageTypes; + } + @Override public String toString() { final StringBuilder b = new StringBuilder(); @@ -70,12 +77,18 @@ public class BlocksWithLocations { return b.append("[]").toString(); } - b.append(storageIDs[0]).append('@').append(datanodeUuids[0]); + appendString(0, b.append("[")); for(int i = 1; i < datanodeUuids.length; i++) { - b.append(", ").append(storageIDs[i]).append("@").append(datanodeUuids[i]); + appendString(i, b.append(",")); } return b.append("]").toString(); } + + private StringBuilder appendString(int i, StringBuilder b) { + return b.append("[").append(storageTypes[i]).append("]") + .append(storageIDs[i]) + .append("@").append(datanodeUuids[i]); + } } private final BlockWithLocations[] blocks; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java Fri Aug 1 01:05:33 2014 @@ -37,7 +37,7 @@ import com.google.common.base.Preconditi public class EnumCounters<E extends Enum<E>> { /** The class of the enum. */ private final Class<E> enumClass; - /** The counter array, counters[i] corresponds to the enumConstants[i]. */ + /** An array of longs corresponding to the enum type. */ private final long[] counters; /** @@ -75,6 +75,13 @@ public class EnumCounters<E extends Enum } } + /** Reset all counters to zero. */ + public final void reset() { + for(int i = 0; i < counters.length; i++) { + this.counters[i] = 0L; + } + } + /** Add the given value to counter e. */ public final void add(final E e, final long value) { counters[e.ordinal()] += value; Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java?rev=1615015&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java Fri Aug 1 01:05:33 2014 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.Arrays; + +import com.google.common.base.Preconditions; + +/** + * Similar to {@link EnumCounters} except that the value type is double. + * + * @param <E> the enum type + */ +public class EnumDoubles<E extends Enum<E>> { + /** The class of the enum. */ + private final Class<E> enumClass; + /** An array of doubles corresponding to the enum type. */ + private final double[] doubles; + + /** + * Construct doubles for the given enum constants. + * @param enumClass the enum class. + */ + public EnumDoubles(final Class<E> enumClass) { + final E[] enumConstants = enumClass.getEnumConstants(); + Preconditions.checkNotNull(enumConstants); + this.enumClass = enumClass; + this.doubles = new double[enumConstants.length]; + } + + /** @return the value corresponding to e. */ + public final double get(final E e) { + return doubles[e.ordinal()]; + } + + /** Negate all values. */ + public final void negation() { + for(int i = 0; i < doubles.length; i++) { + doubles[i] = -doubles[i]; + } + } + + /** Set e to the given value. */ + public final void set(final E e, final double value) { + doubles[e.ordinal()] = value; + } + + /** Set the values of this object to that object. */ + public final void set(final EnumDoubles<E> that) { + for(int i = 0; i < doubles.length; i++) { + this.doubles[i] = that.doubles[i]; + } + } + + /** Reset all values to zero. */ + public final void reset() { + for(int i = 0; i < doubles.length; i++) { + this.doubles[i] = 0.0; + } + } + + /** Add the given value to e. */ + public final void add(final E e, final double value) { + doubles[e.ordinal()] += value; + } + + /** Add the values of that object to this. */ + public final void add(final EnumDoubles<E> that) { + for(int i = 0; i < doubles.length; i++) { + this.doubles[i] += that.doubles[i]; + } + } + + /** Subtract the given value from e. */ + public final void subtract(final E e, final double value) { + doubles[e.ordinal()] -= value; + } + + /** Subtract the values of this object from that object. */ + public final void subtract(final EnumDoubles<E> that) { + for(int i = 0; i < doubles.length; i++) { + this.doubles[i] -= that.doubles[i]; + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj == null || !(obj instanceof EnumDoubles)) { + return false; + } + final EnumDoubles<?> that = (EnumDoubles<?>)obj; + return this.enumClass == that.enumClass + && Arrays.equals(this.doubles, that.doubles); + } + + @Override + public int hashCode() { + return Arrays.hashCode(doubles); + } + + @Override + public String toString() { + final E[] enumConstants = enumClass.getEnumConstants(); + final StringBuilder b = new StringBuilder(); + for(int i = 0; i < doubles.length; i++) { + final String name = enumConstants[i].name(); + b.append(name).append("=").append(doubles[i]).append(", "); + } + return b.substring(0, b.length() - 2); + } +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Fri Aug 1 01:05:33 2014 @@ -404,6 +404,7 @@ message BlockWithLocationsProto { required BlockProto block = 1; // Block repeated string datanodeUuids = 2; // Datanodes with replicas of the block repeated string storageUuids = 3; // Storages with replicas of the block + repeated StorageTypeProto storageTypes = 4; } /** Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1615015&r1=1615014&r2=1615015&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Fri Aug 1 01:05:33 2014 @@ -184,8 +184,10 @@ public class TestPBHelper { private static BlockWithLocations getBlockWithLocations(int bid) { final String[] datanodeUuids = {"dn1", "dn2", "dn3"}; final String[] storageIDs = {"s1", "s2", "s3"}; + final StorageType[] storageTypes = { + StorageType.DISK, StorageType.DISK, StorageType.DISK}; return new BlockWithLocations(new Block(bid, 0, 1), - datanodeUuids, storageIDs); + datanodeUuids, storageIDs, storageTypes); } private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {