This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 71dcb8f  HDFS-14637. Namenode may not replicate blocks to meet the 
policy after enabling upgradeDomain. Contributed by Stephen O'Donnell.
71dcb8f is described below

commit 71dcb8f6d405b0b8025d41b41e03418c565f7ed1
Author: Stephen O'Donnell <sodonn...@cloudera.com>
AuthorDate: Thu Oct 3 22:12:27 2019 -0700

    HDFS-14637. Namenode may not replicate blocks to meet the policy after 
enabling upgradeDomain. Contributed by Stephen O'Donnell.
    
    Reviewed-by: Ayush Saxena <ayushsax...@apache.org>
    Signed-off-by: Wei-Chiu Chuang <weic...@apache.org>
    (cherry picked from commit c99a12167ff9566012ef32104a3964887d62c899)
    
     Conflicts:
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
        
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java
    
    (cherry picked from commit 966193153f9cd75e009c8db3502e1b3ba2cdfa25)
    
     Conflicts:
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
---
 .../hdfs/server/blockmanagement/BlockManager.java  |  69 +++++++++-----
 .../blockmanagement/BlockPlacementStatus.java      |   8 ++
 .../BlockPlacementStatusDefault.java               |   8 ++
 .../BlockPlacementStatusWithNodeGroup.java         |  11 +++
 .../BlockPlacementStatusWithUpgradeDomain.java     |  22 ++++-
 .../java/org/apache/hadoop/hdfs/DFSTestUtil.java   |  17 +++-
 .../blockmanagement/BlockManagerTestUtil.java      |  30 +++++-
 .../TestBlockPlacementStatusDefault.java           |  57 +++++++++++
 .../TestBlockPlacementStatusWithUpgradeDomain.java |  59 +++++++++++-
 .../TestBlocksWithNotEnoughRacks.java              | 105 +++++++++++++++++++++
 10 files changed, 357 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e9c11b9..4e450e2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1999,6 +1999,7 @@ public class BlockManager implements BlockStatsMXBean {
         (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
   }
 
+  @VisibleForTesting
   BlockReconstructionWork scheduleReconstruction(BlockInfo block,
       int priority) {
     // skip abandoned block or block reopened for append
@@ -2043,7 +2044,9 @@ public class BlockManager implements BlockStatsMXBean {
       additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas()
           - pendingNum;
     } else {
-      additionalReplRequired = 1; // Needed on a new rack
+      // Violates placement policy. Needed on a new rack or domain etc.
+      BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
+      additionalReplRequired = placementStatus.getAdditionalReplicasRequired();
     }
 
     final BlockCollection bc = getBlockCollection(block);
@@ -2076,20 +2079,6 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  private boolean isInNewRack(DatanodeDescriptor[] srcs,
-      DatanodeDescriptor target) {
-    LOG.debug("check if target {} increases racks, srcs={}", target,
-        Arrays.asList(srcs));
-    for (DatanodeDescriptor src : srcs) {
-      if (!src.isDecommissionInProgress() &&
-          src.getNetworkLocation().equals(target.getNetworkLocation())) {
-        LOG.debug("the target {} is in the same rack with src {}", target, 
src);
-        return false;
-      }
-    }
-    return true;
-  }
-
   private boolean validateReconstructionWork(BlockReconstructionWork rw) {
     BlockInfo block = rw.getBlock();
     int priority = rw.getPriority();
@@ -2115,10 +2104,16 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     DatanodeStorageInfo[] targets = rw.getTargets();
+    BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
     if ((numReplicas.liveReplicas() >= requiredRedundancy) &&
-        (!isPlacementPolicySatisfied(block)) ) {
-      if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) {
-        // No use continuing, unless a new rack in this case
+        (!placementStatus.isPlacementPolicySatisfied())) {
+      BlockPlacementStatus newPlacementStatus =
+          getBlockPlacementStatus(block, targets);
+      if (!newPlacementStatus.isPlacementPolicySatisfied() &&
+          (newPlacementStatus.getAdditionalReplicasRequired() >=
+              placementStatus.getAdditionalReplicasRequired())) {
+        // If the new targets do not meet the placement policy, or at least
+        // reduce the number of replicas needed, then no use continuing.
         return false;
       }
       // mark that the reconstruction work is to replicate internal block to a
@@ -4512,7 +4507,25 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   boolean isPlacementPolicySatisfied(BlockInfo storedBlock) {
+    return getBlockPlacementStatus(storedBlock, null)
+        .isPlacementPolicySatisfied();
+  }
+
+  BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock) {
+    return getBlockPlacementStatus(storedBlock, null);
+  }
+
+  BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock,
+      DatanodeStorageInfo[] additionalStorage) {
     List<DatanodeDescriptor> liveNodes = new ArrayList<>();
+    if (additionalStorage != null) {
+      // additionalNodes, are potential new targets for the block. If there are
+      // any passed, include them when checking the placement policy to see if
+      // the policy is met, when it may not have been met without these nodes.
+      for (DatanodeStorageInfo s : additionalStorage) {
+        liveNodes.add(getDatanodeDescriptorFromStorage(s));
+      }
+    }
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(storedBlock);
     for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
@@ -4520,7 +4533,22 @@ public class BlockManager implements BlockStatsMXBean {
           && storage.getState() == State.NORMAL) {
         // assume the policy is satisfied for blocks on PROVIDED storage
         // as long as the storage is in normal state.
-        return true;
+        return new BlockPlacementStatus() {
+          @Override
+          public boolean isPlacementPolicySatisfied() {
+            return true;
+          }
+
+          @Override
+          public String getErrorDescription() {
+            return null;
+          }
+
+          @Override
+          public int getAdditionalReplicasRequired() {
+            return 0;
+          }
+        };
       }
       final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage);
       // Nodes under maintenance should be counted as valid replicas from
@@ -4536,8 +4564,7 @@ public class BlockManager implements BlockStatsMXBean {
         .getPolicy(blockType);
     int numReplicas = blockType == STRIPED ? ((BlockInfoStriped) storedBlock)
         .getRealTotalBlockNum() : storedBlock.getReplication();
-    return placementPolicy.verifyBlockPlacement(locs, numReplicas)
-        .isPlacementPolicySatisfied();
+    return placementPolicy.verifyBlockPlacement(locs, numReplicas);
   }
 
   boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock,
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java
index e2ac54a..a227666 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatus.java
@@ -39,4 +39,12 @@ public interface BlockPlacementStatus {
    */
   public String getErrorDescription();
 
+  /**
+   * Return the number of additional replicas needed to ensure the block
+   * placement policy is satisfied.
+   * @return The number of new replicas needed to satisify the placement policy
+   * or zero if no extra are needed
+   */
+  int getAdditionalReplicasRequired();
+
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
index 75bb65d..7612142 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
@@ -45,4 +45,12 @@ public class BlockPlacementStatusDefault implements 
BlockPlacementStatus {
         " more rack(s). Total number of racks in the cluster: " + totalRacks;
   }
 
+  @Override
+  public int getAdditionalReplicasRequired() {
+    if (isPlacementPolicySatisfied()) {
+      return 0;
+    } else {
+      return requiredRacks - currentRacks;
+    }
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
index b98b3da..ac5a5b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithNodeGroup.java
@@ -78,4 +78,15 @@ public class BlockPlacementStatusWithNodeGroup implements 
BlockPlacementStatus {
     }
     return errorDescription.toString();
   }
+
+  @Override
+  public int getAdditionalReplicasRequired() {
+    if (isPlacementPolicySatisfied()) {
+      return 0;
+    } else {
+      int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired();
+      int child = requiredNodeGroups - currentNodeGroups.size();
+      return Math.max(parent, child);
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
index 4b3c3cc..b839ced 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
@@ -85,4 +85,24 @@ public class BlockPlacementStatusWithUpgradeDomain implements
     }
     return errorDescription.toString();
   }
-}
\ No newline at end of file
+
+  @Override
+  public int getAdditionalReplicasRequired() {
+    if (isPlacementPolicySatisfied()) {
+      return 0;
+    } else {
+      // It is possible for a block to have the correct number of upgrade
+      // domains, but only a single rack, or be on multiple racks, but only in
+      // one upgrade domain.
+      int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired();
+      int child;
+
+      if (numberOfReplicas <= upgradeDomainFactor) {
+        child = numberOfReplicas - upgradeDomains.size();
+      } else {
+        child = upgradeDomainFactor - upgradeDomains.size();
+      }
+      return Math.max(parent, child);
+    }
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 37996ae..3d0d882 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -526,17 +526,24 @@ public class DFSTestUtil {
     }
   }
 
+  public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock 
b,
+      int racks, int replicas, int neededReplicas)
+      throws TimeoutException, InterruptedException {
+    waitForReplication(cluster, b, racks, replicas, neededReplicas, 0);
+  }
+
   /*
    * Wait up to 20s for the given block to be replicated across
    * the requested number of racks, with the requested number of
    * replicas, and the requested number of replicas still needed.
    */
   public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock 
b,
-      int racks, int replicas, int neededReplicas)
+      int racks, int replicas, int neededReplicas, int neededDomains)
       throws TimeoutException, InterruptedException {
     int curRacks = 0;
     int curReplicas = 0;
     int curNeededReplicas = 0;
+    int curDomains = 0;
     int count = 0;
     final int ATTEMPTS = 20;
 
@@ -547,17 +554,21 @@ public class DFSTestUtil {
       curRacks = r[0];
       curReplicas = r[1];
       curNeededReplicas = r[2];
+      curDomains = r[3];
       count++;
     } while ((curRacks != racks ||
               curReplicas != replicas ||
-              curNeededReplicas != neededReplicas) && count < ATTEMPTS);
+        curNeededReplicas != neededReplicas ||
+        (neededDomains != 0 && curDomains != neededDomains))
+        && count < ATTEMPTS);
 
     if (count == ATTEMPTS) {
       throw new TimeoutException("Timed out waiting for replication."
           + " Needed replicas = "+neededReplicas
           + " Cur needed replicas = "+curNeededReplicas
           + " Replicas = "+replicas+" Cur replicas = "+curReplicas
-          + " Racks = "+racks+" Cur racks = "+curRacks);
+          + " Racks = "+racks+" Cur racks = "+curRacks
+          + " Domains = "+neededDomains+" Cur domains = "+curDomains);
     }
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index dfb40a6..fff909f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -81,7 +81,8 @@ public class BlockManagerTestUtil {
 
   /**
    * @return a tuple of the replica state (number racks, number live
-   * replicas, and number needed replicas) for the given block.
+   * replicas, number needed replicas and number of UpgradeDomains) for the
+   * given block.
    */
   public static int[] getReplicaInfo(final FSNamesystem namesystem, final 
Block b) {
     final BlockManager bm = namesystem.getBlockManager();
@@ -90,7 +91,8 @@ public class BlockManagerTestUtil {
       final BlockInfo storedBlock = bm.getStoredBlock(b);
       return new int[]{getNumberOfRacks(bm, b),
           bm.countNodes(storedBlock).liveReplicas(),
-          bm.neededReconstruction.contains(storedBlock) ? 1 : 0};
+          bm.neededReconstruction.contains(storedBlock) ? 1 : 0,
+          getNumberOfDomains(bm, b)};
     } finally {
       namesystem.readUnlock();
     }
@@ -121,6 +123,30 @@ public class BlockManagerTestUtil {
   }
 
   /**
+   * @return the number of UpgradeDomains over which a given block is 
replicated
+   * decommissioning/decommissioned nodes are not counted. corrupt replicas
+   * are also ignored.
+   */
+  private static int getNumberOfDomains(final BlockManager blockManager,
+                                        final Block b) {
+    final Set<String> domSet = new HashSet<String>(0);
+    final Collection<DatanodeDescriptor> corruptNodes =
+        getCorruptReplicas(blockManager).getNodes(b);
+    for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
+      final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if ((corruptNodes == null) || !corruptNodes.contains(cur)) {
+          String domain = cur.getUpgradeDomain();
+          if (domain != null && !domSet.contains(domain)) {
+            domSet.add(domain);
+          }
+        }
+      }
+    }
+    return domSet.size();
+  }
+
+  /**
    * @return redundancy monitor thread instance from block manager.
    */
   public static Daemon getRedundancyThread(final BlockManager blockManager) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java
new file mode 100644
index 0000000..6b07334
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusDefault.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import org.junit.Test;
+
+/**
+ * Unit tests to validate the BlockPlacementStatusDefault policy, focusing on
+ * the getAdditionAlReplicasRequired method.
+ */
+public class TestBlockPlacementStatusDefault {
+
+  @Test
+  public void testIsPolicySatisfiedCorrectly() {
+    // 2 current racks and 2 expected
+    BlockPlacementStatusDefault bps =
+        new BlockPlacementStatusDefault(2, 2, 5);
+    assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
+
+    // 1 current rack and 2 expected
+    bps =
+        new BlockPlacementStatusDefault(1, 2, 5);
+    assertFalse(bps.isPlacementPolicySatisfied());
+    assertEquals(1, bps.getAdditionalReplicasRequired());
+
+    // 3 current racks and 2 expected
+    bps =
+        new BlockPlacementStatusDefault(3, 2, 5);
+    assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
+
+    // 1 current rack and 2 expected, but only 1 rack on the cluster
+    bps =
+        new BlockPlacementStatusDefault(1, 2, 1);
+    assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java
index bfff932..1e0fb76 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -49,11 +50,13 @@ public class TestBlockPlacementStatusWithUpgradeDomain {
   @Test
   public void testIsPolicySatisfiedParentFalse() {
     when(bpsd.isPlacementPolicySatisfied()).thenReturn(false);
+    when(bpsd.getAdditionalReplicasRequired()).thenReturn(1);
     BlockPlacementStatusWithUpgradeDomain bps =
         new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
 
     // Parent policy is not satisfied but upgrade domain policy is
     assertFalse(bps.isPlacementPolicySatisfied());
+    assertEquals(1, bps.getAdditionalReplicasRequired());
   }
 
   @Test
@@ -63,21 +66,73 @@ public class TestBlockPlacementStatusWithUpgradeDomain {
     // Number of domains, replicas and upgradeDomainFactor is equal and parent
     // policy is satisfied
     assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
   }
 
   @Test
-  public void testIsPolicySatisifedSmallDomains() {
+  public void testIsPolicySatisfiedSmallDomains() {
     // Number of domains is less than replicas but equal to factor
     BlockPlacementStatusWithUpgradeDomain bps =
         new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 3);
     assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
 
     // Same as above but replicas is greater than factor
     bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 
2);
     assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
 
     // Number of domains is less than replicas and factor
     bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 
4);
     assertFalse(bps.isPlacementPolicySatisfied());
+    assertEquals(1, bps.getAdditionalReplicasRequired());
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testIsPolicySatisfiedSmallReplicas() {
+    // Replication factor 1 file
+    upgradeDomains.clear();
+    upgradeDomains.add("1");
+    BlockPlacementStatusWithUpgradeDomain bps =
+        new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 1, 3);
+    assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
+
+    // Replication factor 2 file, but one domain
+    bps =
+        new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
+    assertFalse(bps.isPlacementPolicySatisfied());
+    assertEquals(1, bps.getAdditionalReplicasRequired());
+
+    // Replication factor 2 file, but two domains
+    upgradeDomains.add("2");
+    bps =
+        new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
+    assertTrue(bps.isPlacementPolicySatisfied());
+    assertEquals(0, bps.getAdditionalReplicasRequired());
+  }
+
+  @Test
+  public void testPolicyIsNotSatisfiedInsufficientDomains() {
+    // Insufficient Domains - 1 domain, replication factor 3
+    upgradeDomains.clear();
+    upgradeDomains.add("1");
+    BlockPlacementStatusWithUpgradeDomain bps =
+        new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
+    assertFalse(bps.isPlacementPolicySatisfied());
+    assertEquals(2, bps.getAdditionalReplicasRequired());
+
+    // One domain, replication factor 2 file
+    bps =
+        new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
+    assertFalse(bps.isPlacementPolicySatisfied());
+    assertEquals(1, bps.getAdditionalReplicasRequired());
+
+    // 2 domains, replication factor 3
+    upgradeDomains.add("2");
+    bps =
+        new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
+    assertFalse(bps.isPlacementPolicySatisfied());
+    assertEquals(1, bps.getAdditionalReplicasRequired());
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
index 2bf6045..dda5fef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertArrayEquals;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,6 +45,8 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 import org.slf4j.event.Level;
 
+import static org.junit.Assert.*;
+
 public class TestBlocksWithNotEnoughRacks {
   public static final Log LOG = 
LogFactory.getLog(TestBlocksWithNotEnoughRacks.class);
   static {
@@ -472,4 +476,105 @@ public class TestBlocksWithNotEnoughRacks {
       hostsFileWriter.cleanup();
     }
   }
+
+  @Test
+  public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception 
{
+    Configuration conf = getConf();
+    final short replicationFactor = 3;
+    final Path filePath = new Path("/testFile");
+
+    conf.set("dfs.block.replicator.classname",
+        "org.apache.hadoop.hdfs.server.blockmanagement." +
+            "BlockPlacementPolicyWithUpgradeDomain");
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(6).build();
+    cluster.waitClusterUp();
+
+    List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
+
+    try {
+      // Create a file with one block with a replication factor of 3
+      // No upgrade domains are set.
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L);
+      ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
+
+      BlockManager bm = cluster.getNamesystem().getBlockManager();
+      BlockInfo storedBlock = bm.getStoredBlock(b.getLocalBlock());
+
+      // The block should be replicated OK - so Reconstruction Work will be 
null
+      BlockReconstructionWork work = bm.scheduleReconstruction(storedBlock, 2);
+      assertNull(work);
+      // Set the upgradeDomain to "3" for the 3 nodes hosting the block.
+      // Then alternately set the remaining 3 nodes to have an upgradeDomain
+      // of 0 or 1 giving a total of 3 upgradeDomains.
+      for (int i=0; i<storedBlock.getReplication(); i++) {
+        storedBlock.getDatanode(i).setUpgradeDomain("3");
+      }
+      int udInd = 0;
+      for (DatanodeDescriptor d : dnDescriptors) {
+        if (d.getUpgradeDomain() == null) {
+          d.setUpgradeDomain(Integer.toString(udInd % 2));
+          udInd++;
+        }
+      }
+      // Now reconWork is non-null and 2 extra targets are needed
+      work = bm.scheduleReconstruction(storedBlock, 2);
+      assertEquals(2, work.getAdditionalReplRequired());
+
+      // Add the block to the replication queue and ensure it is replicated
+      // correctly.
+      bm.neededReconstruction.add(storedBlock, 3, 0, 0, replicationFactor);
+      DFSTestUtil.waitForReplication(cluster, b, 1, replicationFactor, 0, 3);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testUnderReplicatedRespectsRacksAndUpgradeDomain()
+      throws Exception {
+    Configuration conf = getConf();
+    final short replicationFactor = 3;
+    final Path filePath = new Path("/testFile");
+
+    conf.set("dfs.block.replicator.classname",
+        "org.apache.hadoop.hdfs.server.blockmanagement." +
+        "BlockPlacementPolicyWithUpgradeDomain");
+
+    // All hosts are on two racks
+    String[] racks = {"/r1", "/r1", "/r1", "/r2", "/r2", "/r2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(6).racks(racks).build();
+    cluster.waitClusterUp();
+    List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
+    for (int i=0; i < dnDescriptors.size(); i++) {
+      dnDescriptors.get(i).setUpgradeDomain(Integer.toString(i%3));
+    }
+    try {
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L);
+      ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
+      fs.setReplication(filePath, replicationFactor);
+      DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0, 3);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private List<DatanodeDescriptor> getDnDescriptors(MiniDFSCluster cluster)
+      throws IOException {
+    List<DatanodeDescriptor> dnDesc = new ArrayList<>();
+    DatanodeManager dnManager = cluster.getNamesystem().getBlockManager()
+        .getDatanodeManager();
+    for (DataNode dn : cluster.getDataNodes()) {
+      DatanodeDescriptor d = dnManager.getDatanode(dn.getDatanodeUuid());
+      if (d == null) {
+        throw new IOException("DatanodeDescriptor not found for DN "+
+            dn.getDatanodeUuid());
+      }
+      dnDesc.add(d);
+    }
+    return dnDesc;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to