Repository: hadoop
Updated Branches:
  refs/heads/branch-2 5fb8ac18d -> 63bdbb779


HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget. (yliu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63bdbb77
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63bdbb77
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63bdbb77

Branch: refs/heads/branch-2
Commit: 63bdbb7793ea3ee70271984b2970ce4d28b6fd4b
Parents: 5fb8ac1
Author: yliu <y...@apache.org>
Authored: Thu Aug 20 20:08:20 2015 +0800
Committer: yliu <y...@apache.org>
Committed: Thu Aug 20 20:08:20 2015 +0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../BlockPlacementPolicyDefault.java            | 176 ++++++++++++-------
 .../BlockPlacementPolicyWithNodeGroup.java      |  35 +---
 .../TestDefaultBlockPlacementPolicy.java        |  49 +++++-
 4 files changed, 161 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/63bdbb77/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ca51779..2cf2082 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -474,6 +474,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8917. Cleanup BlockInfoUnderConstruction from comments and tests.
     (Zhe Zhang via jing9)
 
+    HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget.
+    (yliu)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63bdbb77/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 9023e0a..3aea5c9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -437,17 +437,11 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     return writer;
   }
-  
-  /**
-   * Choose <i>localMachine</i> as the target.
-   * if <i>localMachine</i> is not available, 
-   * choose a node on the same rack
-   * @return the chosen storage
-   */
+
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
+      EnumMap<StorageType, Integer> storageTypes)
       throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null) {
@@ -458,7 +452,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         && clusterMap.contains(localMachine)) {
       DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
-      if (excludedNodes.add(localMachine)) { // was not in the excluded list
+      if (excludedNodes.add(localMachine) // was not in the excluded list
+          && isGoodDatanode(localDatanode, maxNodesPerRack, false,
+              results, avoidStaleNodes)) {
         for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
             .entrySet().iterator(); iter.hasNext(); ) {
           Map.Entry<StorageType, Integer> entry = iter.next();
@@ -466,7 +462,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
               localDatanode.getStorageInfos())) {
             StorageType type = entry.getKey();
             if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
-                maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
+                results, type) >= 0) {
               int num = entry.getValue();
               if (num == 1) {
                 iter.remove();
@@ -479,6 +475,26 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         }
       } 
     }
+    return null;
+  }
+
+  /**
+   * Choose <i>localMachine</i> as the target.
+   * if <i>localMachine</i> is not available,
+   * choose a node on the same rack
+   * @return the chosen storage
+   */
+  protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
+      throws NotEnoughReplicasException {
+    DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
+        excludedNodes, blocksize, maxNodesPerRack, results,
+        avoidStaleNodes, storageTypes);
+    if (localStorage != null) {
+      return localStorage;
+    }
 
     if (!fallbackToLocalRack) {
       return null;
@@ -653,6 +669,14 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
           builder.append("\nNode 
").append(NodeBase.getPath(chosenNode)).append(" [");
         }
         numOfAvailableNodes--;
+        if (!isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
+            results, avoidStaleNodes)) {
+          if (LOG.isDebugEnabled()) {
+            builder.append("\n]");
+          }
+          badTarget = true;
+          continue;
+        }
 
         final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
             chosenNode.getStorageInfos());
@@ -664,8 +688,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
           for (i = 0; i < storages.length; i++) {
             StorageType type = entry.getKey();
             final int newExcludedNodes = addIfIsGoodTarget(storages[i],
-                excludedNodes, blocksize, maxNodesPerRack, considerLoad, 
results,
-                avoidStaleNodes, type);
+                excludedNodes, blocksize, results, type);
             if (newExcludedNodes >= 0) {
               numOfReplicas--;
               if (firstChosen == null) {
@@ -725,13 +748,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
   int addIfIsGoodTarget(DatanodeStorageInfo storage,
       Set<Node> excludedNodes,
       long blockSize,
-      int maxNodesPerRack,
-      boolean considerLoad,
-      List<DatanodeStorageInfo> results,                           
-      boolean avoidStaleNodes,
+      List<DatanodeStorageInfo> results,
       StorageType storageType) {
-    if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad,
-        results, avoidStaleNodes, storageType)) {
+    if (isGoodTarget(storage, blockSize, results, storageType)) {
       results.add(storage);
       // add node and related nodes to excludedNode
       return addToExcludedNodes(storage.getDatanodeDescriptor(), 
excludedNodes);
@@ -749,75 +768,52 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     }
   }
 
+  private static void logNodeIsNotChosen(DatanodeDescriptor node,
+      String reason) {
+    if (LOG.isDebugEnabled()) {
+      // build the error message for later use.
+      debugLoggingBuilder.get()
+          .append("\n  Datanode ").append(node)
+          .append(" is not chosen since ").append(reason).append(".");
+    }
+  }
+
   /**
-   * Determine if a storage is a good target. 
-   * 
-   * @param storage The target storage
-   * @param blockSize Size of block
-   * @param maxTargetPerRack Maximum number of targets per rack. The value of 
-   *                       this parameter depends on the number of racks in 
+   * Determine if a datanode is good for placing block.
+   *
+   * @param node The target datanode
+   * @param maxTargetPerRack Maximum number of targets per rack. The value of
+   *                       this parameter depends on the number of racks in
    *                       the cluster and total number of replicas for a block
    * @param considerLoad whether or not to consider load of the target node
-   * @param results A list containing currently chosen nodes. Used to check if 
+   * @param results A list containing currently chosen nodes. Used to check if
    *                too many nodes has been chosen in the target rack.
    * @param avoidStaleNodes Whether or not to avoid choosing stale nodes
-   * @return Return true if <i>node</i> has enough space, 
-   *         does not have too much load, 
-   *         and the rack does not have too many nodes.
+   * @return Reture true if the datanode is good candidate, otherwise false
    */
-  private boolean isGoodTarget(DatanodeStorageInfo storage,
-                               long blockSize, int maxTargetPerRack,
-                               boolean considerLoad,
-                               List<DatanodeStorageInfo> results,
-                               boolean avoidStaleNodes,
-                               StorageType requiredStorageType) {
-    if (storage.getStorageType() != requiredStorageType) {
-      logNodeIsNotChosen(storage, "storage types do not match,"
-          + " where the required storage type is " + requiredStorageType);
-      return false;
-    }
-    if (storage.getState() == State.READ_ONLY_SHARED) {
-      logNodeIsNotChosen(storage, "storage is read-only");
-      return false;
-    }
-
-    if (storage.getState() == State.FAILED) {
-      logNodeIsNotChosen(storage, "storage has failed");
-      return false;
-    }
-
-    DatanodeDescriptor node = storage.getDatanodeDescriptor();
+  boolean isGoodDatanode(DatanodeDescriptor node,
+                         int maxTargetPerRack, boolean considerLoad,
+                         List<DatanodeStorageInfo> results,
+                         boolean avoidStaleNodes) {
     // check if the node is (being) decommissioned
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
+      logNodeIsNotChosen(node, "the node is (being) decommissioned ");
       return false;
     }
 
     if (avoidStaleNodes) {
       if (node.isStale(this.staleInterval)) {
-        logNodeIsNotChosen(storage, "the node is stale ");
+        logNodeIsNotChosen(node, "the node is stale ");
         return false;
       }
     }
-    
-    final long requiredSize = blockSize * 
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
-    final long scheduledSize = blockSize * 
node.getBlocksScheduled(storage.getStorageType());
-    final long remaining = node.getRemaining(storage.getStorageType());
-    if (requiredSize > remaining - scheduledSize) {
-      logNodeIsNotChosen(storage, "the node does not have enough "
-          + storage.getStorageType() + " space"
-          + " (required=" + requiredSize
-          + ", scheduled=" + scheduledSize
-          + ", remaining=" + remaining + ")");
-      return false;
-    }
 
     // check the communication traffic of the target machine
     if (considerLoad) {
       final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
       final int nodeLoad = node.getXceiverCount();
       if (nodeLoad > maxLoad) {
-        logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad
+        logNodeIsNotChosen(node, "the node is too busy (load: " + nodeLoad
             + " > " + maxLoad + ") ");
         return false;
       }
@@ -832,10 +828,56 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         counter++;
       }
     }
-    if (counter>maxTargetPerRack) {
-      logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
+    if (counter > maxTargetPerRack) {
+      logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Determine if a storage is a good target.
+   *
+   * @param storage The target storage
+   * @param blockSize Size of block
+   * @param results A list containing currently chosen nodes. Used to check if
+   *                too many nodes has been chosen in the target rack.
+   * @return Return true if <i>node</i> has enough space.
+   */
+  private boolean isGoodTarget(DatanodeStorageInfo storage,
+                               long blockSize,
+                               List<DatanodeStorageInfo> results,
+                               StorageType requiredStorageType) {
+    if (storage.getStorageType() != requiredStorageType) {
+      logNodeIsNotChosen(storage, "storage types do not match,"
+          + " where the required storage type is " + requiredStorageType);
+      return false;
+    }
+    if (storage.getState() == State.READ_ONLY_SHARED) {
+      logNodeIsNotChosen(storage, "storage is read-only");
+      return false;
+    }
+
+    if (storage.getState() == State.FAILED) {
+      logNodeIsNotChosen(storage, "storage has failed");
+      return false;
+    }
+
+    DatanodeDescriptor node = storage.getDatanodeDescriptor();
+
+    final long requiredSize = blockSize * 
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
+    final long scheduledSize = blockSize * 
node.getBlocksScheduled(storage.getStorageType());
+    final long remaining = node.getRemaining(storage.getStorageType());
+    if (requiredSize > remaining - scheduledSize) {
+      logNodeIsNotChosen(storage, "the node does not have enough "
+          + storage.getStorageType() + " space"
+          + " (required=" + requiredSize
+          + ", scheduled=" + scheduledSize
+          + ", remaining=" + remaining + ")");
       return false;
     }
+
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63bdbb77/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
index 3a68348..8ec1778 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
@@ -66,34 +64,11 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
       EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
       throws NotEnoughReplicasException {
-    // if no local machine, randomly choose one node
-    if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
-
-    // otherwise try local machine first
-    if (localMachine instanceof DatanodeDescriptor) {
-      DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
-      if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
-            .entrySet().iterator(); iter.hasNext(); ) {
-          Map.Entry<StorageType, Integer> entry = iter.next();
-          for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
-              localDataNode.getStorageInfos())) {
-            StorageType type = entry.getKey();
-            if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
-                maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
-              int num = entry.getValue();
-              if (num == 1) {
-                iter.remove();
-              } else {
-                entry.setValue(num - 1);
-              }
-              return localStorage;
-            }
-          }
-        }
-      }
+    DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
+        excludedNodes, blocksize, maxNodesPerRack, results,
+        avoidStaleNodes, storageTypes);
+    if (localStorage != null) {
+      return localStorage;
     }
 
     // try a node on local node group

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63bdbb77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
index 38daddc..5709cee 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
@@ -29,8 +29,11 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.net.StaticMapping;
 import org.junit.After;
@@ -81,7 +84,37 @@ public class TestDefaultBlockPlacementPolicy {
     // Map client to RACK2
     String clientRack = "/RACK2";
     StaticMapping.addNodeToRack(clientMachine, clientRack);
-    testPlacement(clientMachine, clientRack);
+    testPlacement(clientMachine, clientRack, true);
+  }
+
+  /**
+   * Verify local node selection
+   */
+  @Test
+  public void testLocalStoragePlacement() throws Exception {
+    String clientMachine = "/host3";
+    testPlacement(clientMachine, "/RACK3", true);
+  }
+
+  /**
+   * Verify decommissioned nodes should not be selected.
+   */
+  @Test
+  public void testPlacementWithLocalRackNodesDecommissioned() throws Exception 
{
+    String clientMachine = "client.foo.com";
+    // Map client to RACK3
+    String clientRack = "/RACK3";
+    StaticMapping.addNodeToRack(clientMachine, clientRack);
+    final DatanodeManager dnm = 
namesystem.getBlockManager().getDatanodeManager();
+    DatanodeDescriptor dnd3 = dnm.getDatanode(
+        cluster.getDataNodes().get(3).getDatanodeId());
+    assertEquals(dnd3.getNetworkLocation(), clientRack);
+    dnm.getDecomManager().startDecommission(dnd3);
+    try {
+      testPlacement(clientMachine, clientRack, false);
+    } finally {
+      dnm.getDecomManager().stopDecommission(dnd3);
+    }
   }
 
   /**
@@ -93,11 +126,11 @@ public class TestDefaultBlockPlacementPolicy {
     // Don't map client machine to any rack,
     // so by default it will be treated as /default-rack
     // in that case a random node should be selected as first node.
-    testPlacement(clientMachine, null);
+    testPlacement(clientMachine, null, true);
   }
 
   private void testPlacement(String clientMachine,
-      String clientRack) throws IOException {
+      String clientRack, boolean hasBlockReplicaOnRack) throws IOException {
     // write 5 files and check whether all times block placed
     for (int i = 0; i < 5; i++) {
       String src = "/test-" + i;
@@ -111,8 +144,14 @@ public class TestDefaultBlockPlacementPolicy {
       assertEquals("Block should be allocated sufficient locations",
           REPLICATION_FACTOR, locatedBlock.getLocations().length);
       if (clientRack != null) {
-        assertEquals("First datanode should be rack local", clientRack,
-            locatedBlock.getLocations()[0].getNetworkLocation());
+        if (hasBlockReplicaOnRack) {
+          assertEquals("First datanode should be rack local", clientRack,
+              locatedBlock.getLocations()[0].getNetworkLocation());
+        } else {
+          for (DatanodeInfo dni : locatedBlock.getLocations()) {
+            assertNotEquals(clientRack, dni.getNetworkLocation());
+          }
+        }
       }
       nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
           src, clientMachine);

Reply via email to