Author: arp Date: Wed Feb 19 22:59:37 2014 New Revision: 1569951 URL: http://svn.apache.org/r1569951 Log: HDFS-5318. Support read-only and read-write paths to shared replicas. (Contributed by Eric Sirianni)
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Feb 19 22:59:37 2014 @@ -418,6 +418,9 @@ Release 2.4.0 - UNRELEASED HDFS-5973. add DomainSocket#shutdown method (cmccabe) + HDFS-5318. Support read-only and read-write paths to shared replicas. + (Eric Sirianni via Arpit Agarwal) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Feb 19 22:59:37 2014 @@ -1520,8 +1520,8 @@ public class PBHelper { private static StorageState convertState(State state) { switch(state) { - case READ_ONLY: - return StorageState.READ_ONLY; + case READ_ONLY_SHARED: + return StorageState.READ_ONLY_SHARED; case NORMAL: default: return StorageState.NORMAL; @@ -1549,8 +1549,8 @@ public class PBHelper { private static State convertState(StorageState state) { switch(state) { - case READ_ONLY: - return DatanodeStorage.State.READ_ONLY; + case READ_ONLY_SHARED: + return DatanodeStorage.State.READ_ONLY_SHARED; case NORMAL: default: return DatanodeStorage.State.NORMAL; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Feb 19 22:59:37 2014 @@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; @@ -482,7 +483,10 @@ public class BlockManager { chooseSourceDatanode(block, containingNodes, containingLiveReplicasNodes, numReplicas, UnderReplicatedBlocks.LEVEL); - assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas(); + + // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are + // not included in the numReplicas.liveReplicas() count + assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas(); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); @@ -1021,7 +1025,7 @@ public class BlockManager { */ private void addToInvalidates(Block b) { StringBuilder datanodes = new StringBuilder(); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); invalidateBlocks.add(b, node, false); datanodes.append(node).append(" "); @@ -1235,7 +1239,10 @@ public class BlockManager { continue; } - assert liveReplicaNodes.size() == numReplicas.liveReplicas(); + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // not included in the numReplicas.liveReplicas() count + assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); + // do not schedule more if enough replicas is already pending numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); @@ -1475,15 +1482,16 @@ public class BlockManager { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); + int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) - corrupt++; + corrupt += countableReplica; else if (node.isDecommissionInProgress() || node.isDecommissioned()) - decommissioned++; + decommissioned += countableReplica; else if (excessBlocks != null && excessBlocks.contains(block)) { - excess++; + excess += countableReplica; } else { nodesContainingLiveReplicas.add(storage); - live++; + live += countableReplica; } containingNodes.add(node); // Check if this replica is corrupt @@ -2480,7 +2488,7 @@ assert storedBlock.findDatanode(dn) < 0 Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>(); Collection<DatanodeDescriptor> corruptNodes = corruptReplicas .getNodes(block); - for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (storage.areBlockContentsStale()) { LOG.info("BLOCK* processOverReplicatedBlock: " + @@ -2809,7 +2817,7 @@ assert storedBlock.findDatanode(dn) < 0 int excess = 0; int stale = 0; Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; @@ -2848,7 +2856,7 @@ assert storedBlock.findDatanode(dn) < 0 // else proceed with fast case int live = 0; Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node))) live++; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Feb 19 22:59:37 2014 @@ -605,7 +605,7 @@ public class BlockPlacementPolicyDefault + storageType); return false; } - if (storage.getState() == State.READ_ONLY) { + if (storage.getState() == State.READ_ONLY_SHARED) { logNodeIsNotChosen(storage, "storage is read-only"); return false; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Wed Feb 19 22:59:37 2014 @@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.server.bl import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and @@ -122,6 +126,22 @@ class BlocksMap { } /** + * Searches for the block in the BlocksMap and + * returns {@link Iterable} of the storages the block belongs to + * <i>that are of the given {@link DatanodeStorage.State state}</i>. + * + * @param state DatanodeStorage state by which to filter the returned Iterable + */ + Iterable<DatanodeStorageInfo> getStorages(Block b, final DatanodeStorage.State state) { + return Iterables.filter(getStorages(blocks.get(b)), new Predicate<DatanodeStorageInfo>() { + @Override + public boolean apply(DatanodeStorageInfo storage) { + return storage.getState() == state; + } + }); + } + + /** * For a block that has already been retrieved from the BlocksMap * returns {@link Iterable} of the storages the block belongs to. */ Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Feb 19 22:59:37 2014 @@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.net.NetUtils; @@ -378,11 +380,13 @@ public class NamenodeFsck { boolean isCorrupt = lBlk.isCorrupt(); String blkName = block.toString(); DatanodeInfo[] locs = lBlk.getLocations(); - res.totalReplicas += locs.length; + NumberReplicas numberReplicas = namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock()); + int liveReplicas = numberReplicas.liveReplicas(); + res.totalReplicas += liveReplicas; short targetFileReplication = file.getReplication(); res.numExpectedReplicas += targetFileReplication; - if (locs.length > targetFileReplication) { - res.excessiveReplicas += (locs.length - targetFileReplication); + if (liveReplicas > targetFileReplication) { + res.excessiveReplicas += (liveReplicas - targetFileReplication); res.numOverReplicatedBlocks += 1; } // Check if block is Corrupt @@ -392,10 +396,10 @@ public class NamenodeFsck { out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() + " block " + block.getBlockName()+"\n"); } - if (locs.length >= minReplication) + if (liveReplicas >= minReplication) res.numMinReplicatedBlocks++; - if (locs.length < targetFileReplication && locs.length > 0) { - res.missingReplicas += (targetFileReplication - locs.length); + if (liveReplicas < targetFileReplication && liveReplicas > 0) { + res.missingReplicas += (targetFileReplication - liveReplicas); res.numUnderReplicatedBlocks += 1; underReplicatedPerFile++; if (!showFiles) { @@ -404,7 +408,7 @@ public class NamenodeFsck { out.println(" Under replicated " + block + ". Target Replicas is " + targetFileReplication + " but found " + - locs.length + " replica(s)."); + liveReplicas + " replica(s)."); } // verify block placement policy BlockPlacementStatus blockPlacementStatus = bpPolicy @@ -421,13 +425,13 @@ public class NamenodeFsck { block + ". " + blockPlacementStatus.getErrorDescription()); } report.append(i + ". " + blkName + " len=" + block.getNumBytes()); - if (locs.length == 0) { + if (liveReplicas == 0) { report.append(" MISSING!"); res.addMissing(block.toString(), block.getNumBytes()); missing++; missize += block.getNumBytes(); } else { - report.append(" repl=" + locs.length); + report.append(" repl=" + liveReplicas); if (showLocations || showRacks) { StringBuilder sb = new StringBuilder("["); for (int j = 0; j < locs.length; j++) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java Wed Feb 19 22:59:37 2014 @@ -28,7 +28,18 @@ public class DatanodeStorage { /** The state of the storage. */ public enum State { NORMAL, - READ_ONLY + + /** + * A storage that represents a read-only path to replicas stored on a shared storage device. + * Replicas on {@link #READ_ONLY_SHARED} storage are not counted towards live replicas. + * + * <p> + * In certain implementations, a {@link #READ_ONLY_SHARED} storage may be correlated to + * its {@link #NORMAL} counterpart using the {@link DatanodeStorage#storageID}. This + * property should be used for debugging purposes only. + * </p> + */ + READ_ONLY_SHARED; } private final String storageID; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Wed Feb 19 22:59:37 2014 @@ -50,7 +50,7 @@ message DatanodeRegistrationProto { message DatanodeStorageProto { enum StorageState { NORMAL = 0; - READ_ONLY = 1; + READ_ONLY_SHARED = 1; } required string storageUuid = 1; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Feb 19 22:59:37 2014 @@ -157,6 +157,7 @@ public class MiniDFSCluster { private boolean checkExitOnShutdown = true; private boolean checkDataNodeAddrConfig = false; private boolean checkDataNodeHostConfig = false; + private Configuration[] dnConfOverlays; public Builder(Configuration conf) { this.conf = conf; @@ -334,6 +335,19 @@ public class MiniDFSCluster { } /** + * Default: null + * + * An array of {@link Configuration} objects that will overlay the + * global MiniDFSCluster Configuration for the corresponding DataNode. + * + * Useful for setting specific per-DataNode configuration parameters. + */ + public Builder dataNodeConfOverlays(Configuration[] dnConfOverlays) { + this.dnConfOverlays = dnConfOverlays; + return this; + } + + /** * Construct the actual MiniDFSCluster */ public MiniDFSCluster build() throws IOException { @@ -375,7 +389,8 @@ public class MiniDFSCluster { builder.nnTopology, builder.checkExitOnShutdown, builder.checkDataNodeAddrConfig, - builder.checkDataNodeHostConfig); + builder.checkDataNodeHostConfig, + builder.dnConfOverlays); } public class DataNodeProperties { @@ -621,7 +636,7 @@ public class MiniDFSCluster { manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, operation, null, racks, hosts, simulatedCapacities, null, true, false, - MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false); + MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null); } private void initMiniDFSCluster( @@ -634,7 +649,8 @@ public class MiniDFSCluster { boolean waitSafeMode, boolean setupHostsFile, MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown, boolean checkDataNodeAddrConfig, - boolean checkDataNodeHostConfig) + boolean checkDataNodeHostConfig, + Configuration[] dnConfOverlays) throws IOException { ExitUtil.disableSystemExit(); @@ -699,7 +715,7 @@ public class MiniDFSCluster { startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs, dnStartOpt != null ? dnStartOpt : startOpt, racks, hosts, simulatedCapacities, setupHostsFile, - checkDataNodeAddrConfig, checkDataNodeHostConfig); + checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays); waitClusterUp(); //make sure ProxyUsers uses the latest conf ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -1102,7 +1118,7 @@ public class MiniDFSCluster { long[] simulatedCapacities, boolean setupHostsFile) throws IOException { startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts, - simulatedCapacities, setupHostsFile, false, false); + simulatedCapacities, setupHostsFile, false, false, null); } /** @@ -1116,7 +1132,7 @@ public class MiniDFSCluster { boolean setupHostsFile, boolean checkDataNodeAddrConfig) throws IOException { startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts, - simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false); + simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null); } /** @@ -1143,7 +1159,8 @@ public class MiniDFSCluster { * @param setupHostsFile add new nodes to dfs hosts files * @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config * @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config - * + * @param dnConfOverlays An array of {@link Configuration} objects that will overlay the + * global MiniDFSCluster Configuration for the corresponding DataNode. * @throws IllegalStateException if NameNode has been shutdown */ public synchronized void startDataNodes(Configuration conf, int numDataNodes, @@ -1152,7 +1169,8 @@ public class MiniDFSCluster { long[] simulatedCapacities, boolean setupHostsFile, boolean checkDataNodeAddrConfig, - boolean checkDataNodeHostConfig) throws IOException { + boolean checkDataNodeHostConfig, + Configuration[] dnConfOverlays) throws IOException { if (operation == StartupOption.RECOVER) { return; } @@ -1192,6 +1210,13 @@ public class MiniDFSCluster { + simulatedCapacities.length + "] is less than the number of datanodes [" + numDataNodes + "]."); } + + if (dnConfOverlays != null + && numDataNodes > dnConfOverlays.length) { + throw new IllegalArgumentException( "The length of dnConfOverlays [" + + dnConfOverlays.length + + "] is less than the number of datanodes [" + numDataNodes + "]."); + } String [] dnArgs = (operation == null || operation != StartupOption.ROLLBACK) ? @@ -1200,6 +1225,9 @@ public class MiniDFSCluster { for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { Configuration dnConf = new HdfsConfiguration(conf); + if (dnConfOverlays != null) { + dnConf.addResource(dnConfOverlays[i]); + } // Set up datanode address setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); if (manageDfsDirs) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java Wed Feb 19 22:59:37 2014 @@ -210,7 +210,8 @@ public class MiniDFSClusterWithNodeGroup long[] simulatedCapacities, boolean setupHostsFile, boolean checkDataNodeAddrConfig, - boolean checkDataNodeHostConfig) throws IOException { + boolean checkDataNodeHostConfig, + Configuration[] dnConfOverlays) throws IOException { startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks, NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1569951&r1=1569950&r2=1569951&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Feb 19 22:59:37 2014 @@ -34,6 +34,7 @@ import javax.management.StandardMBean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -96,6 +97,11 @@ public class SimulatedFSDataset implemen public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte public static final byte DEFAULT_DATABYTE = 9; + public static final String CONFIG_PROPERTY_STATE = + "dfs.datanode.simulateddatastorage.state"; + private static final DatanodeStorage.State DEFAULT_STATE = + DatanodeStorage.State.NORMAL; + static final byte[] nullCrcFileData; static { DataChecksum checksum = DataChecksum.newDataChecksum( @@ -325,9 +331,9 @@ public class SimulatedFSDataset implemen private static class SimulatedStorage { private Map<String, SimulatedBPStorage> map = new HashMap<String, SimulatedBPStorage>(); - private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.generateUuid(); private final long capacity; // in bytes + private final DatanodeStorage dnStorage; synchronized long getFree() { return capacity - getUsed(); @@ -365,8 +371,11 @@ public class SimulatedFSDataset implemen getBPStorage(bpid).free(amount); } - SimulatedStorage(long cap) { + SimulatedStorage(long cap, DatanodeStorage.State state) { capacity = cap; + dnStorage = new DatanodeStorage( + "SimulatedStorage-" + DatanodeStorage.generateUuid(), + state, StorageType.DEFAULT); } synchronized void addBlockPool(String bpid) { @@ -390,11 +399,15 @@ public class SimulatedFSDataset implemen } String getStorageUuid() { - return storageUuid; + return dnStorage.getStorageID(); + } + + DatanodeStorage getDnStorage() { + return dnStorage; } synchronized StorageReport getStorageReport(String bpid) { - return new StorageReport(new DatanodeStorage(getStorageUuid()), + return new StorageReport(dnStorage, false, getCapacity(), getUsed(), getFree(), map.get(bpid).getUsed()); } @@ -417,7 +430,8 @@ public class SimulatedFSDataset implemen registerMBean(datanodeUuid); this.storage = new SimulatedStorage( - conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); + conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), + conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); } public synchronized void injectBlocks(String bpid, @@ -488,7 +502,7 @@ public class SimulatedFSDataset implemen @Override public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports( String bpid) { - return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid)); + return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid)); } @Override // FsDatasetSpi Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java?rev=1569951&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java Wed Feb 19 22:59:37 2014 @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; +import static org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +/** + * Test proper {@link BlockManager} replication counting for {@link DatanodeStorage}s + * with {@link DatanodeStorage.State#READ_ONLY_SHARED READ_ONLY} state. + * + * Uses {@link SimulatedFSDataset} to inject read-only replicas into a DataNode. + */ +public class TestReadOnlySharedStorage { + + public static final Log LOG = LogFactory.getLog(TestReadOnlySharedStorage.class); + + private static short NUM_DATANODES = 3; + private static int RO_NODE_INDEX = 0; + private static final int BLOCK_SIZE = 1024; + private static final long seed = 0x1BADF00DL; + private static final Path PATH = new Path("/" + TestReadOnlySharedStorage.class.getName() + ".dat"); + private static final int RETRIES = 10; + + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private DFSClient client; + + private BlockManager blockManager; + + private DatanodeManager datanodeManager; + private DatanodeInfo normalDataNode; + private DatanodeInfo readOnlyDataNode; + + private Block block; + + private ExtendedBlock extendedBlock; + + + /** + * Setup a {@link MiniDFSCluster}. + * Create a block with both {@link State#NORMAL} and {@link State#READ_ONLY_SHARED} replicas. + */ + @Before + public void setup() throws IOException, InterruptedException { + conf = new HdfsConfiguration(); + SimulatedFSDataset.setFactory(conf); + + Configuration[] overlays = new Configuration[NUM_DATANODES]; + for (int i = 0; i < overlays.length; i++) { + overlays[i] = new Configuration(); + if (i == RO_NODE_INDEX) { + overlays[i].setEnum(SimulatedFSDataset.CONFIG_PROPERTY_STATE, + i == RO_NODE_INDEX + ? READ_ONLY_SHARED + : NORMAL); + } + } + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_DATANODES) + .dataNodeConfOverlays(overlays) + .build(); + fs = cluster.getFileSystem(); + blockManager = cluster.getNameNode().getNamesystem().getBlockManager(); + datanodeManager = blockManager.getDatanodeManager(); + client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), + cluster.getConfiguration(0)); + + for (int i = 0; i < NUM_DATANODES; i++) { + DataNode dataNode = cluster.getDataNodes().get(i); + validateStorageState( + BlockManagerTestUtil.getStorageReportsForDatanode( + datanodeManager.getDatanode(dataNode.getDatanodeId())), + i == RO_NODE_INDEX + ? READ_ONLY_SHARED + : NORMAL); + } + + // Create a 1 block file + DFSTestUtil.createFile(fs, PATH, BLOCK_SIZE, BLOCK_SIZE, + BLOCK_SIZE, (short) 1, seed); + + LocatedBlock locatedBlock = getLocatedBlock(); + extendedBlock = locatedBlock.getBlock(); + block = extendedBlock.getLocalBlock(); + + assertThat(locatedBlock.getLocations().length, is(1)); + normalDataNode = locatedBlock.getLocations()[0]; + readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId()); + assertThat(normalDataNode, is(not(readOnlyDataNode))); + + validateNumberReplicas(1); + + // Inject the block into the datanode with READ_ONLY_SHARED storage + cluster.injectBlocks(RO_NODE_INDEX, Collections.singleton(block)); + + // There should now be 2 *locations* for the block + // Must wait until the NameNode has processed the block report for the injected blocks + waitForLocations(2); + } + + @After + public void tearDown() throws IOException { + fs.delete(PATH, false); + + if (cluster != null) { + fs.close(); + cluster.shutdown(); + cluster = null; + } + } + + private void waitForLocations(int locations) throws IOException, InterruptedException { + for (int tries = 0; tries < RETRIES; ) + try { + LocatedBlock locatedBlock = getLocatedBlock(); + assertThat(locatedBlock.getLocations().length, is(locations)); + break; + } catch (AssertionError e) { + if (++tries < RETRIES) { + Thread.sleep(1000); + } else { + throw e; + } + } + } + + private LocatedBlock getLocatedBlock() throws IOException { + LocatedBlocks locatedBlocks = client.getLocatedBlocks(PATH.toString(), 0, BLOCK_SIZE); + assertThat(locatedBlocks.getLocatedBlocks().size(), is(1)); + return Iterables.getOnlyElement(locatedBlocks.getLocatedBlocks()); + } + + private void validateStorageState(StorageReport[] storageReports, DatanodeStorage.State state) { + for (StorageReport storageReport : storageReports) { + DatanodeStorage storage = storageReport.getStorage(); + assertThat(storage.getState(), is(state)); + } + } + + private void validateNumberReplicas(int expectedReplicas) throws IOException { + NumberReplicas numberReplicas = blockManager.countNodes(block); + assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); + assertThat(numberReplicas.excessReplicas(), is(0)); + assertThat(numberReplicas.corruptReplicas(), is(0)); + assertThat(numberReplicas.decommissionedReplicas(), is(0)); + assertThat(numberReplicas.replicasOnStaleNodes(), is(0)); + + BlockManagerTestUtil.updateState(blockManager); + assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L)); + assertThat(blockManager.getExcessBlocksCount(), is(0L)); + } + + /** + * Verify that <tt>READ_ONLY_SHARED</tt> replicas are <i>not</i> counted towards the overall + * replication count, but <i>are</i> included as replica locations returned to clients for reads. + */ + @Test + public void testReplicaCounting() throws Exception { + // There should only be 1 *replica* (the READ_ONLY_SHARED doesn't count) + validateNumberReplicas(1); + + fs.setReplication(PATH, (short) 2); + + // There should now be 3 *locations* for the block, and 2 *replicas* + waitForLocations(3); + validateNumberReplicas(2); + } + + /** + * Verify that the NameNode is able to still use <tt>READ_ONLY_SHARED</tt> replicas even + * when the single NORMAL replica is offline (and the effective replication count is 0). + */ + @Test + public void testNormalReplicaOffline() throws Exception { + // Stop the datanode hosting the NORMAL replica + cluster.stopDataNode(normalDataNode.getXferAddr()); + + // Force NameNode to detect that the datanode is down + BlockManagerTestUtil.noticeDeadDatanode( + cluster.getNameNode(), normalDataNode.getXferAddr()); + + // The live replica count should now be zero (since the NORMAL replica is offline) + NumberReplicas numberReplicas = blockManager.countNodes(block); + assertThat(numberReplicas.liveReplicas(), is(0)); + + // The block should be reported as under-replicated + BlockManagerTestUtil.updateState(blockManager); + assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L)); + + // The BlockManager should be able to heal the replication count back to 1 + // by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas + BlockManagerTestUtil.computeAllPendingWork(blockManager); + + DFSTestUtil.waitForReplication(cluster, extendedBlock, 1, 1, 0); + + // There should now be 2 *locations* for the block, and 1 *replica* + assertThat(getLocatedBlock().getLocations().length, is(2)); + validateNumberReplicas(1); + } + + /** + * Verify that corrupt <tt>READ_ONLY_SHARED</tt> replicas aren't counted + * towards the corrupt replicas total. + */ + @Test + public void testReadOnlyReplicaCorrupt() throws Exception { + // "Corrupt" a READ_ONLY_SHARED replica by reporting it as a bad replica + client.reportBadBlocks(new LocatedBlock[] { + new LocatedBlock(extendedBlock, new DatanodeInfo[] { readOnlyDataNode }) + }); + + // There should now be only 1 *location* for the block as the READ_ONLY_SHARED is corrupt + waitForLocations(1); + + // However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count + NumberReplicas numberReplicas = blockManager.countNodes(block); + assertThat(numberReplicas.corruptReplicas(), is(0)); + } + +}