http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java new file mode 100644 index 0000000..622b258 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.util.ReflectionUtils; + +public class BlockPlacementPolicies{ + + private final BlockPlacementPolicy replicationPolicy; + private final BlockPlacementPolicy ecPolicy; + + public BlockPlacementPolicies(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap){ + final Class<? extends BlockPlacementPolicy> replicatorClass = conf + .getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, + BlockPlacementPolicy.class); + replicationPolicy = ReflectionUtils.newInstance(replicatorClass, conf); + replicationPolicy.initialize(conf, stats, clusterMap, host2datanodeMap); + final Class<? extends BlockPlacementPolicy> blockPlacementECClass = + conf.getClass(DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, + DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_DEFAULT, + BlockPlacementPolicy.class); + ecPolicy = ReflectionUtils.newInstance(blockPlacementECClass, conf); + ecPolicy.initialize(conf, stats, clusterMap, host2datanodeMap); + } + + public BlockPlacementPolicy getPolicy(boolean isStriped){ + if (isStriped) { + return ecPolicy; + } else { + return replicationPolicy; + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 9696179..86aaf79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -145,31 +145,7 @@ public abstract class BlockPlacementPolicy { abstract protected void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, Host2NodesMap host2datanodeMap); - - /** - * Get an instance of the configured Block Placement Policy based on the - * the configuration property - * {@link DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. - * - * @param conf the configuration to be used - * @param stats an object that is used to retrieve the load on the cluster - * @param clusterMap the network topology of the cluster - * @return an instance of BlockPlacementPolicy - */ - public static BlockPlacementPolicy getInstance(Configuration conf, - FSClusterStats stats, - NetworkTopology clusterMap, - Host2NodesMap host2datanodeMap) { - final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass( - DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, - DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, - BlockPlacementPolicy.class); - final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( - replicatorClass, conf); - replicator.initialize(conf, stats, clusterMap, host2datanodeMap); - return replicator; - } - + /** * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java new file mode 100644 index 0000000..4dbf384 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; + +import java.util.*; + +/** + * The class is responsible for choosing the desired number of targets + * for placing block replicas. + * The strategy is that it tries its best to place the replicas to most racks. + */ +@InterfaceAudience.Private +public class BlockPlacementPolicyRackFaultTolarent extends BlockPlacementPolicyDefault { + + @Override + protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = numOfChosen + numOfReplicas; + if (totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; + } + // No calculation needed when there is only one rack or picking one node. + int numOfRacks = clusterMap.getNumOfRacks(); + if (numOfRacks == 1 || totalNumOfReplicas <= 1) { + return new int[] {numOfReplicas, totalNumOfReplicas}; + } + if(totalNumOfReplicas<numOfRacks){ + return new int[] {numOfReplicas, 1}; + } + int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1; + return new int[] {numOfReplicas, maxNodesPerRack}; + } + + /** + * Choose numOfReplicas in order: + * 1. If total replica expected is less than numOfRacks in cluster, it choose + * randomly. + * 2. If total replica expected is bigger than numOfRacks, it choose: + * 2a. Fill each rack exactly (maxNodesPerRack-1) replicas. + * 2b. For some random racks, place one more replica to each one of them, until + * numOfReplicas have been chosen. <br> + * In the end, the difference of the numbers of replicas for each two racks + * is no more than 1. + * Either way it always prefer local storage. + * @return local node of writer + */ + @Override + protected Node chooseTargetInOrder(int numOfReplicas, + Node writer, + final Set<Node> excludedNodes, + final long blocksize, + final int maxNodesPerRack, + final List<DatanodeStorageInfo> results, + final boolean avoidStaleNodes, + final boolean newBlock, + EnumMap<StorageType, Integer> storageTypes) + throws NotEnoughReplicasException { + int totalReplicaExpected = results.size() + numOfReplicas; + int numOfRacks = clusterMap.getNumOfRacks(); + if (totalReplicaExpected < numOfRacks || + totalReplicaExpected % numOfRacks == 0) { + writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + return writer; + } + + assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks; + + // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1) + // replicas. + HashMap<String, Integer> rackCounts = new HashMap<>(); + for (DatanodeStorageInfo dsInfo : results) { + String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation(); + Integer count = rackCounts.get(rack); + if (count != null) { + rackCounts.put(rack, count + 1); + } else { + rackCounts.put(rack, 1); + } + } + int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results + for (int count : rackCounts.values()) { + if (count > maxNodesPerRack -1) { + excess += count - (maxNodesPerRack -1); + } + } + numOfReplicas = Math.min(totalReplicaExpected - results.size(), + (maxNodesPerRack -1) * numOfRacks - (results.size() - excess)); + + // Fill each rack exactly (maxNodesPerRack-1) replicas. + writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes), + blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes); + + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes); + } + + // For some racks, place one more replica to each one of them. + numOfReplicas = totalReplicaExpected - results.size(); + chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + + return writer; + } + + /** + * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>. + * Except that 1st replica prefer local storage. + * @return local node of writer. + */ + private Node chooseOnce(int numOfReplicas, + Node writer, + final Set<Node> excludedNodes, + final long blocksize, + final int maxNodesPerRack, + final List<DatanodeStorageInfo> results, + final boolean avoidStaleNodes, + EnumMap<StorageType, Integer> storageTypes) + throws NotEnoughReplicasException { + if (numOfReplicas == 0) { + return writer; + } + writer = chooseLocalStorage(writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) + .getDatanodeDescriptor(); + if (--numOfReplicas == 0) { + return writer; + } + chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + return writer; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 0dbf485..0e0fd91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -43,8 +43,15 @@ class BlocksMap { @Override public boolean hasNext() { - return blockInfo != null && nextIdx < blockInfo.getCapacity() - && blockInfo.getDatanode(nextIdx) != null; + if (blockInfo == null) { + return false; + } + while (nextIdx < blockInfo.getCapacity() && + blockInfo.getDatanode(nextIdx) == null) { + // note that for striped blocks there may be null in the triplets + nextIdx++; + } + return nextIdx < blockInfo.getCapacity(); } @Override @@ -123,12 +130,16 @@ class BlocksMap { return; blockInfo.setBlockCollection(null); - for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) { + final int size = blockInfo instanceof BlockInfoContiguous ? + blockInfo.numNodes() : blockInfo.getCapacity(); + for(int idx = size - 1; idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); - dn.removeBlock(blockInfo); // remove from the list and wipe the location + if (dn != null) { + dn.removeBlock(blockInfo); // remove from the list and wipe the location + } } } - + /** Returns the block object it it exists in the map. */ BlockInfo getStoredBlock(Block b) { return blocks.get(b); @@ -190,8 +201,8 @@ class BlocksMap { // remove block from the data-node list and the node from the block info boolean removed = node.removeBlock(info); - if (info.getDatanode(0) == null // no datanodes left - && info.isDeleted()) { // does not belong to a file + if (info.hasNoStorage() // no datanodes left + && info.isDeleted()) { // does not belong to a file blocks.remove(b); // remove block from the map } return removed; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 7e12a99..3cf9db6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -31,8 +31,8 @@ import java.util.Queue; import java.util.Set; import com.google.common.annotations.VisibleForTesting; - import com.google.common.collect.ImmutableList; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -41,13 +41,16 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; @@ -220,9 +223,12 @@ public class DatanodeDescriptor extends DatanodeInfo { /** A queue of blocks to be replicated by this datanode */ private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<>(); + /** A queue of blocks to be erasure coded by this datanode */ + private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks = + new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ - private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks = - new BlockQueue<BlockInfoContiguousUnderConstruction>(); + private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks = + new BlockQueue<>(); /** A set of blocks to be invalidated by this datanode */ private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<>(); @@ -280,7 +286,8 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - DatanodeStorageInfo[] getStorageInfos() { + @VisibleForTesting + public DatanodeStorageInfo[] getStorageInfos() { synchronized (storageMap) { final Collection<DatanodeStorageInfo> storages = storageMap.values(); return storages.toArray(new DatanodeStorageInfo[storages.size()]); @@ -377,6 +384,7 @@ public class DatanodeDescriptor extends DatanodeInfo { this.invalidateBlocks.clear(); this.recoverBlocks.clear(); this.replicateBlocks.clear(); + this.erasurecodeBlocks.clear(); } // pendingCached, cached, and pendingUncached are protected by the // FSN lock. @@ -577,6 +585,7 @@ public class DatanodeDescriptor extends DatanodeInfo { Iterator<BlockInfo> getBlockIterator() { return new BlockIterator(getStorageInfos()); } + Iterator<BlockInfo> getBlockIterator(final String storageID) { return new BlockIterator(getStorageInfo(storageID)); } @@ -598,9 +607,23 @@ public class DatanodeDescriptor extends DatanodeInfo { } /** + * Store block erasure coding work. + */ + void addBlockToBeErasureCoded(ExtendedBlock block, + DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets, + short[] liveBlockIndices, ECSchema ecSchema, int cellSize) { + assert (block != null && sources != null && sources.length > 0); + BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, + liveBlockIndices, ecSchema, cellSize); + erasurecodeBlocks.offer(task); + BlockManager.LOG.debug("Adding block recovery task " + task + "to " + + getName() + ", current queue size is " + erasurecodeBlocks.size()); + } + + /** * Store block recovery work. */ - void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) { + void addBlockToBeRecovered(BlockInfoUnderConstruction block) { if(recoverBlocks.contains(block)) { // this prevents adding the same block twice to the recovery queue BlockManager.LOG.info(block + " is already in the recovery queue"); @@ -629,6 +652,14 @@ public class DatanodeDescriptor extends DatanodeInfo { } /** + * The number of work items that are pending to be replicated + */ + @VisibleForTesting + public int getNumberOfBlocksToBeErasureCoded() { + return erasurecodeBlocks.size(); + } + + /** * The number of block invalidation items that are pending to * be sent to the datanode */ @@ -642,11 +673,15 @@ public class DatanodeDescriptor extends DatanodeInfo { return replicateBlocks.poll(maxTransfers); } - public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { - List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers); + public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) { + return erasurecodeBlocks.poll(maxTransfers); + } + + public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { + List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers); if(blocks == null) return null; - return blocks.toArray(new BlockInfoContiguousUnderConstruction[blocks.size()]); + return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]); } /** @@ -660,6 +695,13 @@ public class DatanodeDescriptor extends DatanodeInfo { } } + @VisibleForTesting + public boolean containsInvalidateBlock(Block block) { + synchronized (invalidateBlocks) { + return invalidateBlocks.contains(block); + } + } + /** * @return Approximate number of blocks currently scheduled to be written */ @@ -842,6 +884,10 @@ public class DatanodeDescriptor extends DatanodeInfo { if (repl > 0) { sb.append(" ").append(repl).append(" blocks to be replicated;"); } + int ec = erasurecodeBlocks.size(); + if(ec > 0) { + sb.append(" ").append(ec).append(" blocks to be erasure coded;"); + } int inval = invalidateBlocks.size(); if (inval > 0) { sb.append(" ").append(inval).append(" blocks to be invalidated;"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 3397bbb..dbd07d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; @@ -1380,29 +1381,29 @@ public class DatanodeManager { } //check lease recovery - BlockInfoContiguousUnderConstruction[] blocks = nodeinfo + BlockInfoUnderConstruction[] blocks = nodeinfo .getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); - for (BlockInfoContiguousUnderConstruction b : blocks) { + for (BlockInfoUnderConstruction b : blocks) { final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List<DatanodeStorageInfo> recoveryLocations = new ArrayList<>(storages.length); - for (int i = 0; i < storages.length; i++) { - if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) { - recoveryLocations.add(storages[i]); + for (DatanodeStorageInfo storage : storages) { + if (!storage.getDatanodeDescriptor().isStale(staleInterval)) { + recoveryLocations.add(storage); } } // If we are performing a truncate recovery than set recovery fields // to old block. boolean truncateRecovery = b.getTruncateBlock() != null; boolean copyOnTruncateRecovery = truncateRecovery && - b.getTruncateBlock().getBlockId() != b.getBlockId(); + b.getTruncateBlock().getBlockId() != b.toBlock().getBlockId(); ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? new ExtendedBlock(blockPoolId, b.getTruncateBlock()) : - new ExtendedBlock(blockPoolId, b); + new ExtendedBlock(blockPoolId, b.toBlock()); // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. DatanodeInfo[] recoveryInfos; @@ -1419,7 +1420,7 @@ public class DatanodeManager { recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); } if(truncateRecovery) { - Block recoveryBlock = (copyOnTruncateRecovery) ? b : + Block recoveryBlock = (copyOnTruncateRecovery) ? b.toBlock() : b.getTruncateBlock(); brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock)); @@ -1439,6 +1440,13 @@ public class DatanodeManager { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); } + // checking pending erasure coding tasks + List<BlockECRecoveryInfo> pendingECList = + nodeinfo.getErasureCodeCommand(maxTransfers); + if (pendingECList != null) { + cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, + pendingECList)); + } //check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 216d6d2..bdf9f9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -24,6 +24,7 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@ -209,6 +210,7 @@ public class DatanodeStorageInfo { return getState() == State.FAILED && numBlocks != 0; } + @VisibleForTesting public String getStorageID() { return storageID; } @@ -233,7 +235,7 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - public AddBlockResult addBlock(BlockInfo b) { + public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) { // First check whether the block belongs to a different storage // on the same DN. AddBlockResult result = AddBlockResult.ADDED; @@ -252,10 +254,18 @@ public class DatanodeStorageInfo { } // add to the head of the data-node list - b.addStorage(this); + b.addStorage(this, reportedBlock); + insertToList(b); + return result; + } + + AddBlockResult addBlock(BlockInfo b) { + return addBlock(b, b); + } + + public void insertToList(BlockInfo b) { blockList = b.listInsert(blockList, this); numBlocks++; - return result; } public boolean removeBlock(BlockInfo b) { @@ -274,7 +284,6 @@ public class DatanodeStorageInfo { Iterator<BlockInfo> getBlockIterator() { return new BlockIterator(blockList); - } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 797d031..5e3cac2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -36,7 +36,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.util.CyclicIteration; @@ -234,14 +233,14 @@ public class DecommissionManager { } /** - * Checks whether a block is sufficiently replicated for decommissioning. - * Full-strength replication is not always necessary, hence "sufficient". + * Checks whether a block is sufficiently replicated/stored for + * decommissioning. For replicated blocks or striped blocks, full-strength + * replication or storage is not always necessary, hence "sufficient". * @return true if sufficient, else false. */ - private boolean isSufficientlyReplicated(BlockInfo block, - BlockCollection bc, + private boolean isSufficient(BlockInfo block, BlockCollection bc, NumberReplicas numberReplicas) { - final int numExpected = bc.getPreferredBlockReplication(); + final int numExpected = blockManager.getExpectedReplicaNum(bc, block); final int numLive = numberReplicas.liveReplicas(); if (!blockManager.isNeededReplication(block, numExpected, numLive)) { // Block doesn't need replication. Skip. @@ -255,18 +254,19 @@ public class DecommissionManager { if (numExpected > numLive) { if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) { // Can decom a UC block as long as there will still be minReplicas - if (numLive >= blockManager.minReplication) { + if (blockManager.hasMinStorage(block, numLive)) { LOG.trace("UC block {} sufficiently-replicated since numLive ({}) " - + ">= minR ({})", block, numLive, blockManager.minReplication); + + ">= minR ({})", block, numLive, + blockManager.getMinStorageNum(block)); return true; } else { LOG.trace("UC block {} insufficiently-replicated since numLive " + "({}) < minR ({})", block, numLive, - blockManager.minReplication); + blockManager.getMinStorageNum(block)); } } else { // Can decom a non-UC as long as the default replication is met - if (numLive >= blockManager.defaultReplication) { + if (numLive >= blockManager.getDefaultStorageNum(block)) { return true; } } @@ -274,11 +274,11 @@ public class DecommissionManager { return false; } - private static void logBlockReplicationInfo(Block block, BlockCollection bc, + private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc, DatanodeDescriptor srcNode, NumberReplicas num, Iterable<DatanodeStorageInfo> storages) { int curReplicas = num.liveReplicas(); - int curExpectedReplicas = bc.getPreferredBlockReplication(); + int curExpectedReplicas = blockManager.getExpectedReplicaNum(bc, block); StringBuilder nodeList = new StringBuilder(); for (DatanodeStorageInfo storage : storages) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); @@ -407,14 +407,14 @@ public class DecommissionManager { // that are insufficiently replicated for further tracking LOG.debug("Newly-added node {}, doing full scan to find " + "insufficiently-replicated blocks.", dn); - blocks = handleInsufficientlyReplicated(dn); + blocks = handleInsufficientlyStored(dn); decomNodeBlocks.put(dn, blocks); fullScan = true; } else { // This is a known datanode, check if its # of insufficiently // replicated blocks has dropped to zero and if it can be decommed LOG.debug("Processing decommission-in-progress node {}", dn); - pruneSufficientlyReplicated(dn, blocks); + pruneReliableBlocks(dn, blocks); } if (blocks.size() == 0) { if (!fullScan) { @@ -426,7 +426,7 @@ public class DecommissionManager { // marking the datanode as decommissioned LOG.debug("Node {} has finished replicating current set of " + "blocks, checking with the full block map.", dn); - blocks = handleInsufficientlyReplicated(dn); + blocks = handleInsufficientlyStored(dn); decomNodeBlocks.put(dn, blocks); } // If the full scan is clean AND the node liveness is okay, @@ -467,25 +467,23 @@ public class DecommissionManager { } /** - * Removes sufficiently replicated blocks from the block list of a - * datanode. + * Removes reliable blocks from the block list of a datanode. */ - private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode, + private void pruneReliableBlocks(final DatanodeDescriptor datanode, AbstractList<BlockInfo> blocks) { processBlocksForDecomInternal(datanode, blocks.iterator(), null, true); } /** - * Returns a list of blocks on a datanode that are insufficiently - * replicated, i.e. are under-replicated enough to prevent decommission. + * Returns a list of blocks on a datanode that are insufficiently replicated + * or require recovery, i.e. requiring recovery and should prevent + * decommission. * <p/> - * As part of this, it also schedules replication work for - * any under-replicated blocks. + * As part of this, it also schedules replication/recovery work. * - * @param datanode - * @return List of insufficiently replicated blocks + * @return List of blocks requiring recovery */ - private AbstractList<BlockInfo> handleInsufficientlyReplicated( + private AbstractList<BlockInfo> handleInsufficientlyStored( final DatanodeDescriptor datanode) { AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>(); processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), @@ -496,24 +494,22 @@ public class DecommissionManager { /** * Used while checking if decommission-in-progress datanodes can be marked * as decommissioned. Combines shared logic of - * pruneSufficientlyReplicated and handleInsufficientlyReplicated. + * pruneReliableBlocks and handleInsufficientlyStored. * * @param datanode Datanode * @param it Iterator over the blocks on the * datanode - * @param insufficientlyReplicated Return parameter. If it's not null, + * @param insufficientList Return parameter. If it's not null, * will contain the insufficiently * replicated-blocks from the list. - * @param pruneSufficientlyReplicated whether to remove sufficiently - * replicated blocks from the iterator - * @return true if there are under-replicated blocks in the provided block - * iterator, else false. + * @param pruneReliableBlocks whether to remove blocks reliable + * enough from the iterator */ private void processBlocksForDecomInternal( final DatanodeDescriptor datanode, final Iterator<BlockInfo> it, - final List<BlockInfo> insufficientlyReplicated, - boolean pruneSufficientlyReplicated) { + final List<BlockInfo> insufficientList, + boolean pruneReliableBlocks) { boolean firstReplicationLog = true; int underReplicatedBlocks = 0; int decommissionOnlyReplicas = 0; @@ -528,7 +524,7 @@ public class DecommissionManager { it.remove(); continue; } - BlockCollection bc = blockManager.blocksMap.getBlockCollection(block); + BlockCollection bc = blockManager.getBlockCollection(block); if (bc == null) { // Orphan block, will be invalidated eventually. Skip. continue; @@ -536,35 +532,34 @@ public class DecommissionManager { final NumberReplicas num = blockManager.countNodes(block); final int liveReplicas = num.liveReplicas(); - final int curReplicas = liveReplicas; // Schedule under-replicated blocks for replication if not already // pending if (blockManager.isNeededReplication(block, - bc.getPreferredBlockReplication(), liveReplicas)) { + blockManager.getExpectedReplicaNum(bc, block), liveReplicas)) { if (!blockManager.neededReplications.contains(block) && blockManager.pendingReplications.getNumReplicas(block) == 0 && namesystem.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReplications.add(block, - curReplicas, + liveReplicas, num.decommissionedAndDecommissioning(), - bc.getPreferredBlockReplication()); + blockManager.getExpectedReplicaNum(bc, block)); } } // Even if the block is under-replicated, - // it doesn't block decommission if it's sufficiently replicated - if (isSufficientlyReplicated(block, bc, num)) { - if (pruneSufficientlyReplicated) { + // it doesn't block decommission if it's sufficiently replicated + if (isSufficient(block, bc, num)) { + if (pruneReliableBlocks) { it.remove(); } continue; } // We've found an insufficiently replicated block. - if (insufficientlyReplicated != null) { - insufficientlyReplicated.add(block); + if (insufficientList != null) { + insufficientList.add(block); } // Log if this is our first time through if (firstReplicationLog) { @@ -577,7 +572,7 @@ public class DecommissionManager { if (bc.isUnderConstruction()) { underReplicatedInOpenFiles++; } - if ((curReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { + if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { decommissionOnlyReplicas++; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java new file mode 100644 index 0000000..f4600cb7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; + +/** + * ReplicaUnderConstruction contains information about replicas (or blocks + * belonging to a block group) while they are under construction. + * + * The GS, the length and the state of the replica is as reported by the + * datanode. + * + * It is not guaranteed, but expected, that datanodes actually have + * corresponding replicas. + */ +class ReplicaUnderConstruction extends Block { + private final DatanodeStorageInfo expectedLocation; + private HdfsServerConstants.ReplicaState state; + private boolean chosenAsPrimary; + + ReplicaUnderConstruction(Block block, + DatanodeStorageInfo target, + HdfsServerConstants.ReplicaState state) { + super(block); + this.expectedLocation = target; + this.state = state; + this.chosenAsPrimary = false; + } + + /** + * Expected block replica location as assigned when the block was allocated. + * This defines the pipeline order. + * It is not guaranteed, but expected, that the data-node actually has + * the replica. + */ + DatanodeStorageInfo getExpectedStorageLocation() { + return expectedLocation; + } + + /** + * Get replica state as reported by the data-node. + */ + HdfsServerConstants.ReplicaState getState() { + return state; + } + + /** + * Whether the replica was chosen for recovery. + */ + boolean getChosenAsPrimary() { + return chosenAsPrimary; + } + + /** + * Set replica state. + */ + void setState(HdfsServerConstants.ReplicaState s) { + state = s; + } + + /** + * Set whether this replica was chosen for recovery. + */ + void setChosenAsPrimary(boolean chosenAsPrimary) { + this.chosenAsPrimary = chosenAsPrimary; + } + + /** + * Is data-node the replica belongs to alive. + */ + boolean isAlive() { + return expectedLocation.getDatanodeDescriptor().isAlive; + } + + @Override // Block + public int hashCode() { + return super.hashCode(); + } + + @Override // Block + public boolean equals(Object obj) { + // Sufficient to rely on super's implementation + return (this == obj) || super.equals(obj); + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(50); + appendStringTo(b); + return b.toString(); + } + + @Override + public void appendStringTo(StringBuilder sb) { + sb.append("ReplicaUC[") + .append(expectedLocation) + .append("|") + .append(state) + .append("]"); + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java new file mode 100644 index 0000000..479ee4c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.util.SequentialNumber; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_BLOCKS_IN_GROUP; + +/** + * Generate the next valid block group ID by incrementing the maximum block + * group ID allocated so far, with the first 2^10 block group IDs reserved. + * HDFS-EC introduces a hierarchical protocol to name blocks and groups: + * Contiguous: {reserved block IDs | flag | block ID} + * Striped: {reserved block IDs | flag | block group ID | index in group} + * + * Following n bits of reserved block IDs, The (n+1)th bit in an ID + * distinguishes contiguous (0) and striped (1) blocks. For a striped block, + * bits (n+2) to (64-m) represent the ID of its block group, while the last m + * bits represent its index of the group. The value m is determined by the + * maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP). + * + * Note that the {@link #nextValue()} methods requires external lock to + * guarantee IDs have no conflicts. + */ +@InterfaceAudience.Private +public class SequentialBlockGroupIdGenerator extends SequentialNumber { + + private final BlockManager blockManager; + + SequentialBlockGroupIdGenerator(BlockManager blockManagerRef) { + super(Long.MIN_VALUE); + this.blockManager = blockManagerRef; + } + + @Override // NumberGenerator + public long nextValue() { + skipTo((getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + MAX_BLOCKS_IN_GROUP); + // Make sure there's no conflict with existing random block IDs + final Block b = new Block(getCurrentValue()); + while (hasValidBlockInRange(b)) { + skipTo(getCurrentValue() + MAX_BLOCKS_IN_GROUP); + b.setBlockId(getCurrentValue()); + } + if (b.getBlockId() >= 0) { + throw new IllegalStateException("All negative block group IDs are used, " + + "growing into positive IDs, " + + "which might conflict with non-erasure coded blocks."); + } + return getCurrentValue(); + } + + /** + * @param b A block object whose id is set to the starting point for check + * @return true if any ID in the range + * {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file + */ + private boolean hasValidBlockInRange(Block b) { + final long id = b.getBlockId(); + for (int i = 0; i < MAX_BLOCKS_IN_GROUP; i++) { + b.setBlockId(id + i); + if (blockManager.getBlockCollection(b) != null) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java index eef8857..6074784 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.util.SequentialNumber; /** @@ -54,6 +53,11 @@ public class SequentialBlockIdGenerator extends SequentialNumber { while(isValidBlock(b)) { b.setBlockId(super.nextValue()); } + if (b.getBlockId() < 0) { + throw new IllegalStateException("All positive block IDs are used, " + + "wrapping to negative IDs, " + + "which might conflict with erasure coded block groups."); + } return b.getBlockId(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index ebc15b8..7e8f479 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * * <p/> * The policy for choosing which priority to give added blocks - * is implemented in {@link #getPriority(int, int, int)}. + * is implemented in {@link #getPriority(BlockInfo, int, int, int)}. * </p> * <p>The queue order is as follows:</p> * <ol> @@ -145,14 +145,28 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { * @param expectedReplicas expected number of replicas of the block * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) */ - private int getPriority(int curReplicas, + private int getPriority(BlockInfo block, + int curReplicas, int decommissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; if (curReplicas >= expectedReplicas) { // Block has enough copies, but not enough racks return QUEUE_REPLICAS_BADLY_DISTRIBUTED; - } else if (curReplicas == 0) { + } + if (block.isStriped()) { + BlockInfoStriped sblk = (BlockInfoStriped) block; + return getPriorityStriped(curReplicas, decommissionedReplicas, + sblk.getRealDataBlockNum(), sblk.getParityBlockNum()); + } else { + return getPriorityContiguous(curReplicas, decommissionedReplicas, + expectedReplicas); + } + } + + private int getPriorityContiguous(int curReplicas, int decommissionedReplicas, + int expectedReplicas) { + if (curReplicas == 0) { // If there are zero non-decommissioned replicas but there are // some decommissioned replicas, then assign them highest priority if (decommissionedReplicas > 0) { @@ -161,7 +175,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { //all we have are corrupt blocks return QUEUE_WITH_CORRUPT_BLOCKS; } else if (curReplicas == 1) { - //only on replica -risk of loss + // only one replica, highest risk of loss // highest priority return QUEUE_HIGHEST_PRIORITY; } else if ((curReplicas * 3) < expectedReplicas) { @@ -174,6 +188,27 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { } } + private int getPriorityStriped(int curReplicas, int decommissionedReplicas, + short dataBlkNum, short parityBlkNum) { + if (curReplicas < dataBlkNum) { + // There are some replicas on decommissioned nodes so it's not corrupted + if (curReplicas + decommissionedReplicas >= dataBlkNum) { + return QUEUE_HIGHEST_PRIORITY; + } + return QUEUE_WITH_CORRUPT_BLOCKS; + } else if (curReplicas == dataBlkNum) { + // highest risk of loss, highest priority + return QUEUE_HIGHEST_PRIORITY; + } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) { + // can only afford one replica loss + // this is considered very under-replicated + return QUEUE_VERY_UNDER_REPLICATED; + } else { + // add to the normal queue for under replicated blocks + return QUEUE_UNDER_REPLICATED; + } + } + /** add a block to a under replication queue according to its priority * @param block a under replication block * @param curReplicas current number of replicas of the block @@ -186,7 +221,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { int decomissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; - int priLevel = getPriority(curReplicas, decomissionedReplicas, + int priLevel = getPriority(block, curReplicas, decomissionedReplicas, expectedReplicas); if(priorityQueues.get(priLevel).add(block)) { if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && @@ -209,7 +244,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { int oldReplicas, int decommissionedReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(oldReplicas, + int priLevel = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); @@ -283,9 +318,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(curReplicas, decommissionedReplicas, + int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas); - int oldPri = getPriority(oldReplicas, decommissionedReplicas, + int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 11194dc..eb45aa2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -393,4 +393,9 @@ public interface HdfsServerConstants { "raw.hdfs.crypto.file.encryption.info"; String SECURITY_XATTR_UNREADABLE_BY_SUPERUSER = "security.hdfs.unreadable.by.superuser"; + String XATTR_ERASURECODING_ZONE = + "raw.hdfs.erasurecoding.zone"; + + long BLOCK_GROUP_INDEX_MASK = 15; + byte MAX_BLOCKS_IN_GROUP = 16; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 92323f1..d77b36d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -32,11 +32,13 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -722,6 +724,11 @@ class BPOfferService { dxcs.balanceThrottler.setBandwidth(bandwidth); } break; + case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY"); + Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks(); + dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } @@ -751,6 +758,7 @@ class BPOfferService { case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: + case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index abc9390..85f194a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -245,6 +245,33 @@ public class DNConf { } /** + * Returns true if connect to datanode via hostname + * + * @return boolean true if connect to datanode via hostname + */ + public boolean getConnectToDnViaHostname() { + return connectToDnViaHostname; + } + + /** + * Returns socket timeout + * + * @return int socket timeout + */ + public int getSocketTimeout() { + return socketTimeout; + } + + /** + * Returns socket write timeout + * + * @return int socket write timeout + */ + public int getSocketWriteTimeout() { + return socketWriteTimeout; + } + + /** * Returns the SaslPropertiesResolver configured for use with * DataTransferProtocol, or null if not configured. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index e265dad..1b695e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -87,6 +87,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -151,6 +152,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; @@ -358,6 +360,8 @@ public class DataNode extends ReconfigurableBase private String dnUserName = null; private SpanReceiverHost spanReceiverHost; + + private ErasureCodingWorker ecWorker; private static final int NUM_CORES = Runtime.getRuntime() .availableProcessors(); private static final double CONGESTION_RATIO = 1.5; @@ -1159,6 +1163,8 @@ public class DataNode extends ReconfigurableBase saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); + // Initialize ErasureCoding worker + ecWorker = new ErasureCodingWorker(conf, this); } /** @@ -1223,6 +1229,10 @@ public class DataNode extends ReconfigurableBase return UUID.randomUUID().toString(); } + public SaslDataTransferClient getSaslClient() { + return saslClient; + } + /** * Verify that the DatanodeUuid has been initialized. If this is a new * datanode then we generate a new Datanode Uuid and persist it to disk. @@ -1485,7 +1495,7 @@ public class DataNode extends ReconfigurableBase /** * Creates either NIO or regular depending on socketWriteTimeout. */ - protected Socket newSocket() throws IOException { + public Socket newSocket() throws IOException { return (dnConf.socketWriteTimeout > 0) ? SocketChannel.open().socket() : new Socket(); } @@ -1894,6 +1904,21 @@ public class DataNode extends ReconfigurableBase int getXmitsInProgress() { return xmitsInProgress.get(); } + + /** + * Increments the xmitsInProgress count. xmitsInProgress count represents the + * number of data replication/reconstruction tasks running currently. + */ + public void incrementXmitsInProgress() { + xmitsInProgress.getAndIncrement(); + } + + /** + * Decrements the xmitsInProgress count + */ + public void decrementXmitsInProgress() { + xmitsInProgress.getAndDecrement(); + } private void reportBadBlock(final BPOfferService bpos, final ExtendedBlock block, final String msg) { @@ -2113,7 +2138,7 @@ public class DataNode extends ReconfigurableBase */ @Override public void run() { - xmitsInProgress.getAndIncrement(); + incrementXmitsInProgress(); Socket sock = null; DataOutputStream out = null; DataInputStream in = null; @@ -2133,11 +2158,8 @@ public class DataNode extends ReconfigurableBase // // Header info // - Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; - if (isBlockTokenEnabled) { - accessToken = blockPoolTokenSecretManager.generateToken(b, - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); - } + Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); @@ -2195,7 +2217,7 @@ public class DataNode extends ReconfigurableBase // check if there are any disk problem checkDiskErrorAsync(); } finally { - xmitsInProgress.getAndDecrement(); + decrementXmitsInProgress(); IOUtils.closeStream(blockSender); IOUtils.closeStream(out); IOUtils.closeStream(in); @@ -2204,6 +2226,19 @@ public class DataNode extends ReconfigurableBase } } + /*** + * Use BlockTokenSecretManager to generate block token for current user. + */ + public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b, + EnumSet<AccessMode> mode) throws IOException { + Token<BlockTokenIdentifier> accessToken = + BlockTokenSecretManager.DUMMY_TOKEN; + if (isBlockTokenEnabled) { + accessToken = blockPoolTokenSecretManager.generateToken(b, mode); + } + return accessToken; + } + /** * Returns a new DataEncryptionKeyFactory that generates a key from the * BlockPoolTokenSecretManager, using the block pool ID of the given block. @@ -2211,7 +2246,7 @@ public class DataNode extends ReconfigurableBase * @param block for which the factory needs to create a key * @return DataEncryptionKeyFactory for block's block pool ID */ - DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( + public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( final ExtendedBlock block) { return new DataEncryptionKeyFactory() { @Override @@ -3259,4 +3294,9 @@ public class DataNode extends ReconfigurableBase checkSuperuserPrivilege(); spanReceiverHost.removeSpanReceiver(id); } + + public ErasureCodingWorker getErasureCodingWorker(){ + return ecWorker; + + } }