Repository: hadoop
Updated Branches:
  refs/heads/branch-2 8027c3e8b -> b4078e1d0


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 63f11b7..3280563 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
@@ -81,6 +82,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
@@ -165,7 +167,7 @@ public class TestPBHelper {
     DatanodeID dn2 = PBHelperClient.convert(dnProto);
     compare(dn, dn2);
   }
-  
+
   void compare(DatanodeID dn, DatanodeID dn2) {
     assertEquals(dn.getIpAddr(), dn2.getIpAddr());
     assertEquals(dn.getHostName(), dn2.getHostName());
@@ -253,7 +255,7 @@ public class TestPBHelper {
     ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto);
     compare(expKeys, expKeys1);
   }
-  
+
   void compare(ExportedBlockKeys expKeys, ExportedBlockKeys expKeys1) {
     BlockKey[] allKeys = expKeys.getAllKeys();
     BlockKey[] allKeys1 = expKeys1.getAllKeys();
@@ -282,12 +284,12 @@ public class TestPBHelper {
         s1.getMostRecentCheckpointTxId());
     assertEquals(s.getNamespaceID(), s1.getNamespaceID());
   }
-  
+
   private static void compare(RemoteEditLog l1, RemoteEditLog l2) {
     assertEquals(l1.getEndTxId(), l2.getEndTxId());
     assertEquals(l1.getStartTxId(), l2.getStartTxId());
   }
-  
+
   @Test
   public void testConvertRemoteEditLog() {
     RemoteEditLog l = new RemoteEditLog(1, 100);
@@ -295,7 +297,7 @@ public class TestPBHelper {
     RemoteEditLog l1 = PBHelper.convert(lProto);
     compare(l, l1);
   }
-  
+
   @Test
   public void testConvertRemoteEditLogManifest() {
     List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
@@ -304,7 +306,7 @@ public class TestPBHelper {
     RemoteEditLogManifest m = new RemoteEditLogManifest(logs);
     RemoteEditLogManifestProto mProto = PBHelper.convert(m);
     RemoteEditLogManifest m1 = PBHelper.convert(mProto);
-    
+
     List<RemoteEditLog> logs1 = m1.getLogs();
     assertEquals(logs.size(), logs1.size());
     for (int i = 0; i < logs.size(); i++) {
@@ -314,15 +316,15 @@ public class TestPBHelper {
   public ExtendedBlock getExtendedBlock() {
     return getExtendedBlock(1);
   }
-  
+
   public ExtendedBlock getExtendedBlock(long blkid) {
     return new ExtendedBlock("bpid", blkid, 100, 2);
   }
-  
+
   private void compare(DatanodeInfo dn1, DatanodeInfo dn2) {
       assertEquals(dn1.getAdminState(), dn2.getAdminState());
       assertEquals(dn1.getBlockPoolUsed(), dn2.getBlockPoolUsed());
-      assertEquals(dn1.getBlockPoolUsedPercent(), 
+      assertEquals(dn1.getBlockPoolUsedPercent(),
           dn2.getBlockPoolUsedPercent(), DELTA);
       assertEquals(dn1.getCapacity(), dn2.getCapacity());
       assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport());
@@ -336,20 +338,20 @@ public class TestPBHelper {
       assertEquals(dn1.getLevel(), dn2.getLevel());
       assertEquals(dn1.getNetworkLocation(), dn2.getNetworkLocation());
   }
-  
+
   @Test
   public void testConvertExtendedBlock() {
     ExtendedBlock b = getExtendedBlock();
     ExtendedBlockProto bProto = PBHelperClient.convert(b);
     ExtendedBlock b1 = PBHelperClient.convert(bProto);
     assertEquals(b, b1);
-    
+
     b.setBlockId(-1);
     bProto = PBHelperClient.convert(b);
     b1 = PBHelperClient.convert(bProto);
     assertEquals(b, b1);
   }
-  
+
   @Test
   public void testConvertRecoveringBlock() {
     DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo();
@@ -365,7 +367,7 @@ public class TestPBHelper {
       compare(dnInfo[0], dnInfo1[0]);
     }
   }
-  
+
   @Test
   public void testConvertBlockRecoveryCommand() {
     DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo();
@@ -376,14 +378,14 @@ public class TestPBHelper {
       new RecoveringBlock(getExtendedBlock(1), dnInfo, 3),
       new RecoveringBlock(getExtendedBlock(2), dnInfo, 3)
     );
-    
+
     BlockRecoveryCommand cmd = new BlockRecoveryCommand(blks);
     BlockRecoveryCommandProto proto = PBHelper.convert(cmd);
     assertEquals(1, proto.getBlocks(0).getBlock().getB().getBlockId());
     assertEquals(2, proto.getBlocks(1).getBlock().getB().getBlockId());
-    
+
     BlockRecoveryCommand cmd2 = PBHelper.convert(proto);
-    
+
     List<RecoveringBlock> cmd2Blks = Lists.newArrayList(
         cmd2.getRecoveringBlocks());
     assertEquals(blks.get(0).getBlock(), cmd2Blks.get(0).getBlock());
@@ -391,8 +393,8 @@ public class TestPBHelper {
     assertEquals(Joiner.on(",").join(blks), Joiner.on(",").join(cmd2Blks));
     assertEquals(cmd.toString(), cmd2.toString());
   }
-  
-  
+
+
   @Test
   public void testConvertText() {
     Text t = new Text("abc".getBytes());
@@ -400,7 +402,7 @@ public class TestPBHelper {
     Text t1 = new Text(s);
     assertEquals(t, t1);
   }
-  
+
   @Test
   public void testConvertBlockToken() {
     Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
@@ -410,7 +412,7 @@ public class TestPBHelper {
     Token<BlockTokenIdentifier> token2 = PBHelperClient.convert(tokenProto);
     compare(token, token2);
   }
-  
+
   @Test
   public void testConvertNamespaceInfo() {
     NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300);
@@ -455,7 +457,7 @@ public class TestPBHelper {
             AdminStates.DECOMMISSION_INPROGRESS),
         DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
             AdminStates.DECOMMISSIONED),
-        DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", 
+        DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
             AdminStates.NORMAL),
         DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4",
             AdminStates.NORMAL),
@@ -523,7 +525,7 @@ public class TestPBHelper {
       compare(lbl.get(i), lbl2.get(2));
     }
   }
-  
+
   @Test
   public void testConvertLocatedBlockArray() {
     LocatedBlock [] lbl = new LocatedBlock[3];
@@ -563,7 +565,7 @@ public class TestPBHelper {
     DatanodeStorage dns2 = PBHelperClient.convert(proto);
     compare(dns1, dns2);
   }
-  
+
   @Test
   public void testConvertBlockCommand() {
     Block[] blocks = new Block[] { new Block(21), new Block(22) };
@@ -596,7 +598,7 @@ public class TestPBHelper {
       }
     }
   }
-  
+
   @Test
   public void testChecksumTypeProto() {
     assertEquals(DataChecksum.Type.NULL,
@@ -678,4 +680,24 @@ public class TestPBHelper {
     DatanodeInfo dnInfos3 = PBHelperClient.convert(b.build());
     assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed());
   }
+
+  @Test
+  public void testSlowPeerInfoPBHelper() {
+    // Test with a map that has a few slow peer entries.
+    final SlowPeerReports slowPeers = SlowPeerReports.create(
+        ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0));
+    SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo(
+        PBHelper.convertSlowPeerInfo(slowPeers));
+    assertTrue(
+        "Expected map:" + slowPeers + ", got map:" +
+            slowPeersConverted1.getSlowPeers(),
+        slowPeersConverted1.equals(slowPeers));
+
+    // Test with an empty map.
+    SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo(
+        PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT));
+    assertTrue(
+        "Expected empty map:" + ", got map:" + slowPeersConverted2,
+        slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
index ab607ea..f12f6f5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
@@ -42,13 +42,23 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
 /**
  * Test if FSNamesystem handles heartbeat right
  */
 public class TestHeartbeatHandling {
+
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
   /**
    * Test if
    * {@link FSNamesystem#handleHeartbeat}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 5214af3..fbfccbe 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import com.google.common.base.Supplier;
 import java.util.ArrayList;
 import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
@@ -111,7 +113,7 @@ public class TestNameNodePrunesMissingStorages {
       // Stop the DataNode and send fake heartbeat with missing storage.
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 
0,
-          0, null, true);
+          0, null, true, SlowPeerReports.EMPTY_REPORT);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, 
is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
new file mode 100644
index 0000000..15eb3a5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import 
org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.ReportForJson;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link SlowPeerTracker}.
+ */
+public class TestSlowPeerTracker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestSlowPeerTracker.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  private Configuration conf;
+  private SlowPeerTracker tracker;
+  private FakeTimer timer;
+  private long reportValidityMs;
+
+  @Before
+  public void setup() {
+    conf = new HdfsConfiguration();
+    timer = new FakeTimer();
+    tracker = new SlowPeerTracker(conf, timer);
+    reportValidityMs = tracker.getReportValidityMs();
+  }
+
+  /**
+   * Edge case, there are no reports to retrieve.
+   */
+  @Test
+  public void testEmptyReports() {
+    assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+    assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty());
+  }
+
+  @Test
+  public void testReportsAreRetrieved() {
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node3", "node1");
+    tracker.addReport("node3", "node2");
+
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
+    assertThat(tracker.getReportsForNode("node2").size(), is(1));
+    assertThat(tracker.getReportsForNode("node3").size(), is(2));
+    assertThat(tracker.getReportsForNode("node1").size(), is(0));
+  }
+
+  /**
+   * Test that when all reports are expired, we get back nothing.
+   */
+  @Test
+  public void testAllReportsAreExpired() {
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node3", "node2");
+    tracker.addReport("node1", "node3");
+
+    // No reports should expire after 1ms.
+    timer.advance(1);
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(3));
+
+    // All reports should expire after REPORT_VALIDITY_MS.
+    timer.advance(reportValidityMs);
+    assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+    assertTrue(tracker.getReportsForNode("node1").isEmpty());
+    assertTrue(tracker.getReportsForNode("node2").isEmpty());
+    assertTrue(tracker.getReportsForNode("node3").isEmpty());
+  }
+
+  /**
+   * Test the case when a subset of reports has expired.
+   * Ensure that we only get back non-expired reports.
+   */
+  @Test
+  public void testSomeReportsAreExpired() {
+    tracker.addReport("node3", "node1");
+    tracker.addReport("node3", "node2");
+    timer.advance(reportValidityMs);
+    tracker.addReport("node3", "node4");
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+    assertThat(tracker.getReportsForNode("node3").size(), is(1));
+    assertTrue(tracker.getReportsForNode("node3").contains("node4"));
+  }
+
+  /**
+   * Test the case when an expired report is replaced by a valid one.
+   */
+  @Test
+  public void testReplacement() {
+    tracker.addReport("node2", "node1");
+    timer.advance(reportValidityMs); // Expire the report.
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
+
+    // This should replace the expired report with a newer valid one.
+    tracker.addReport("node2", "node1");
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+    assertThat(tracker.getReportsForNode("node2").size(), is(1));
+  }
+
+  @Test
+  public void testGetJson() throws IOException {
+    tracker.addReport("node1", "node2");
+    tracker.addReport("node2", "node3");
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node4", "node1");
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // And ensure its contents are what we expect.
+    assertThat(reports.size(), is(3));
+    assertTrue(isNodeInReports(reports, "node1"));
+    assertTrue(isNodeInReports(reports, "node2"));
+    assertTrue(isNodeInReports(reports, "node4"));
+
+    assertFalse(isNodeInReports(reports, "node3"));
+  }
+
+  @Test
+  public void testGetJsonSizeIsLimited() throws IOException {
+    tracker.addReport("node1", "node2");
+    tracker.addReport("node1", "node3");
+    tracker.addReport("node2", "node3");
+    tracker.addReport("node2", "node4");
+    tracker.addReport("node3", "node4");
+    tracker.addReport("node3", "node5");
+    tracker.addReport("node4", "node6");
+    tracker.addReport("node5", "node6");
+    tracker.addReport("node5", "node7");
+    tracker.addReport("node6", "node7");
+    tracker.addReport("node6", "node8");
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // Ensure that node4 is not in the list since it was
+    // tagged by just one peer and we already have 5 other nodes.
+    assertFalse(isNodeInReports(reports, "node4"));
+
+    // Remaining nodes should be in the list.
+    assertTrue(isNodeInReports(reports, "node1"));
+    assertTrue(isNodeInReports(reports, "node2"));
+    assertTrue(isNodeInReports(reports, "node3"));
+    assertTrue(isNodeInReports(reports, "node5"));
+    assertTrue(isNodeInReports(reports, "node6"));
+  }
+
+  @Test
+  public void testLowRankedElementsIgnored() throws IOException {
+    // Insert 5 nodes with 2 peer reports each.
+    for (int i = 0; i < 5; ++i) {
+      tracker.addReport("node" + i, "reporter1");
+      tracker.addReport("node" + i, "reporter2");
+    }
+
+    // Insert 10 nodes with 1 peer report each.
+    for (int i = 10; i < 20; ++i) {
+      tracker.addReport("node" + i, "reporter1");
+    }
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // Ensure that only the first 5 nodes with two reports each were
+    // included in the JSON.
+    for (int i = 0; i < 5; ++i) {
+      assertTrue(isNodeInReports(reports, "node" + i));
+    }
+  }
+
+  private boolean isNodeInReports(
+      Set<ReportForJson> reports, String node) {
+    for (ReportForJson report : reports) {
+      if (report.getSlowNode().equalsIgnoreCase(node)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private Set<ReportForJson> getAndDeserializeJson()
+      throws IOException {
+    final String json = tracker.getJson();
+    LOG.info("Got JSON: {}", json);
+    return (new ObjectMapper()).readValue(
+        json, new TypeReference<Set<ReportForJson>>() {});
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index de856e6..cf43fd0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.junit.Assert;
@@ -136,7 +137,8 @@ public class InternalDataNodeTestUtils {
             Mockito.any(StorageReport[].class), Mockito.anyLong(),
             Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
-            Mockito.anyBoolean())).thenReturn(
+            Mockito.anyBoolean(),
+            Mockito.any(SlowPeerReports.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index b7b8966..c6b38ee 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -119,7 +120,7 @@ public class TestBPOfferService {
     Mockito.doReturn(conf).when(mockDn).getConf();
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
-    .when(mockDn).getMetrics();
+        .when(mockDn).getMetrics();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -152,7 +153,8 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.any(VolumeFailureSummary.class),
-          Mockito.anyBoolean());
+          Mockito.anyBoolean(),
+          Mockito.any(SlowPeerReports.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 346250b..fecddc5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
@@ -83,6 +84,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -186,7 +188,8 @@ public class TestBlockRecovery {
             Mockito.anyInt(),
             Mockito.anyInt(),
             Mockito.any(VolumeFailureSummary.class),
-            Mockito.anyBoolean()))
+            Mockito.anyBoolean(),
+            Mockito.any(SlowPeerReports.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
@@ -252,15 +255,15 @@ public class TestBlockRecovery {
   }
 
   /** Sync two replicas */
-  private void testSyncReplicas(ReplicaRecoveryInfo replica1, 
+  private void testSyncReplicas(ReplicaRecoveryInfo replica1,
       ReplicaRecoveryInfo replica2,
       InterDatanodeProtocol dn1,
       InterDatanodeProtocol dn2,
       long expectLen) throws IOException {
-    
+
     DatanodeInfo[] locs = new DatanodeInfo[]{
         mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
-    RecoveringBlock rBlock = new RecoveringBlock(block, 
+    RecoveringBlock rBlock = new RecoveringBlock(block,
         locs, RECOVERY_ID);
     ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
     BlockRecord record1 = new BlockRecord(
@@ -269,7 +272,7 @@ public class TestBlockRecovery {
         DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2);
     syncList.add(record1);
     syncList.add(record2);
-    
+
     when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
         anyLong(), anyLong())).thenReturn("storage1");
     when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
@@ -279,7 +282,7 @@ public class TestBlockRecovery {
         recoveryWorker.new RecoveryTaskContiguous(rBlock);
     RecoveryTaskContiguous.syncBlock(syncList);
   }
-  
+
   /**
    * BlockRecovery_02.8.
    * Two replicas are in Finalized state
@@ -290,9 +293,9 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
-    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED);
 
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -305,9 +308,9 @@ public class TestBlockRecovery {
         REPLICA_LEN1);
 
     // two finalized replicas have different length
-    replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
-    replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
 
     try {
@@ -318,10 +321,10 @@ public class TestBlockRecovery {
           "Inconsistent size of finalized replicas. "));
     }
   }
-  
+
   /**
    * BlockRecovery_02.9.
-   * One replica is Finalized and another is RBW. 
+   * One replica is Finalized and another is RBW.
    * @throws IOException in case of an error
    */
   @Test(timeout=60000)
@@ -329,11 +332,11 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    
+
     // rbw and finalized replicas have the same length
-    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
-    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
 
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -344,11 +347,11 @@ public class TestBlockRecovery {
         REPLICA_LEN1);
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
         REPLICA_LEN1);
-    
+
     // rbw replica has a different length from the finalized one
-    replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
-    replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
 
     dn1 = mock(InterDatanodeProtocol.class);
@@ -359,10 +362,10 @@ public class TestBlockRecovery {
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
   }
-  
+
   /**
    * BlockRecovery_02.10.
-   * One replica is Finalized and another is RWR. 
+   * One replica is Finalized and another is RWR.
    * @throws IOException in case of an error
    */
   @Test(timeout=60000)
@@ -370,11 +373,11 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    
+
     // rbw and finalized replicas have the same length
-    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
-    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
 
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -385,11 +388,11 @@ public class TestBlockRecovery {
         REPLICA_LEN1);
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
-    
+
     // rbw replica has a different length from the finalized one
-    replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
-    replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
 
     dn1 = mock(InterDatanodeProtocol.class);
@@ -401,7 +404,7 @@ public class TestBlockRecovery {
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
   }
-  
+
   /**
    * BlockRecovery_02.11.
    * Two replicas are RBW.
@@ -412,9 +415,9 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
-    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
 
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -425,10 +428,10 @@ public class TestBlockRecovery {
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 
minLen);
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 
minLen);
   }
-  
+
   /**
    * BlockRecovery_02.12.
-   * One replica is RBW and another is RWR. 
+   * One replica is RBW and another is RWR.
    * @throws IOException in case of an error
    */
   @Test(timeout=60000)
@@ -436,9 +439,9 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
-    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
 
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -450,9 +453,9 @@ public class TestBlockRecovery {
     verify(dn2, never()).updateReplicaUnderRecovery(
         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
   }
-  
+
   /**
-   * BlockRecovery_02.13. 
+   * BlockRecovery_02.13.
    * Two replicas are RWR.
    * @throws IOException in case of an error
    */
@@ -461,9 +464,9 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR);
-    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 
+    ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR);
 
     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -471,10 +474,10 @@ public class TestBlockRecovery {
 
     long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
     testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
-    
+
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 
minLen);
     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 
minLen);
-  }  
+  }
 
   private Collection<RecoveringBlock> initRecoveringBlocks() throws 
IOException {
     Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
@@ -661,10 +664,10 @@ public class TestBlockRecovery {
       streams.close();
     }
   }
-  
+
   /**
    * Test to verify the race between finalizeBlock and Lease recovery
-   * 
+   *
    * @throws Exception
    */
   @Test(timeout = 20000)
@@ -682,11 +685,11 @@ public class TestBlockRecovery {
       FSDataOutputStream out = fs.create(path);
       out.writeBytes("data");
       out.hsync();
-      
+
       List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
       final LocatedBlock block = blocks.get(0);
       final DataNode dataNode = cluster.getDataNodes().get(0);
-      
+
       final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
       Thread recoveryThread = new Thread() {
         @Override
@@ -716,7 +719,7 @@ public class TestBlockRecovery {
       }
       Assert.assertTrue("Recovery should be initiated successfully",
           recoveryInitResult.get());
-      
+
       dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
           .getGenerationStamp() + 1, block.getBlock().getBlockId(),
           block.getBlockSize());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index 76885e4..6435d4d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
 import static java.lang.Math.abs;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
@@ -31,11 +36,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
 
 
 /**
@@ -51,6 +51,7 @@ public class TestBpServiceActorScheduler {
   private static final long HEARTBEAT_INTERVAL_MS = 5000;      // 5 seconds
   private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
   private static final long BLOCK_REPORT_INTERVAL_MS = 10000;  // 10 seconds
+  private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000;  // 10 
seconds
   private final Random random = new Random(System.nanoTime());
 
   @Test
@@ -180,13 +181,28 @@ public class TestBpServiceActorScheduler {
     }
   }
 
+  @Test
+  public void testSlowPeerReportScheduling() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      assertTrue(scheduler.isSlowPeersReportDue(now));
+      scheduler.scheduleNextSlowPeerReport();
+      assertFalse(scheduler.isSlowPeersReportDue(now));
+      assertFalse(scheduler.isSlowPeersReportDue(now + 1));
+      assertTrue(scheduler.isSlowPeersReportDue(
+          now + SLOW_PEER_REPORT_INTERVAL_MS));
+    }
+  }
+
   private Scheduler makeMockScheduler(long now) {
     LOG.info("Using now = " + now);
-    Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
-        LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+    Scheduler mockScheduler = spy(new Scheduler(
+        HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
+        BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS));
     doReturn(now).when(mockScheduler).monotonicNow();
     mockScheduler.nextBlockReportTime = now;
     mockScheduler.nextHeartbeatTime = now;
+    mockScheduler.nextSlowPeersReportTime = now;
     return mockScheduler;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index df2fe5a..8a9f0b8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -167,7 +168,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             anyInt(),
             any(VolumeFailureSummary.class),
-            anyBoolean());
+            anyBoolean(),
+            any(SlowPeerReports.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -230,7 +232,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             anyInt(),
             any(VolumeFailureSummary.class),
-            anyBoolean());
+            anyBoolean(),
+            any(SlowPeerReports.class));
 
     // While waiting on the latch for the expected number of heartbeat 
messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
index 5af54a4..b18ff2a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -41,9 +42,10 @@ public class TestDataNodePeerMetrics {
     final int numOpsPerIteration = 1000;
 
     final Configuration conf = new HdfsConfiguration();
-    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
-        windowSize);
-    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
+        windowSize, TimeUnit.SECONDS);
+    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
         numWindows);
     conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index d447a76..c94f74e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -218,7 +219,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyInt(),
            Mockito.anyInt(),
            Mockito.any(VolumeFailureSummary.class),
-           Mockito.anyBoolean());
+           Mockito.anyBoolean(),
+           Mockito.any(SlowPeerReports.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 6557055..eb015c0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -67,6 +67,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
@@ -172,7 +173,7 @@ public class TestFsDatasetCache {
         (DatanodeRegistration) any(),
         (StorageReport[]) any(), anyLong(), anyLong(),
         anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
-        anyBoolean());
+        anyBoolean(), any(SlowPeerReports.class));
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index d8418d4..2b793e9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.junit.After;
@@ -106,7 +107,8 @@ public class TestStorageReport {
         any(DatanodeRegistration.class),
         captor.capture(),
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
-        Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
+        Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
+        Mockito.any(SlowPeerReports.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
new file mode 100644
index 0000000..34e15e5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test that the {@link DataNodePeerMetrics} class is able to detect
+ * outliers i.e. slow nodes via the metrics it maintains.
+ */
+public class TestDataNodeOutlierDetectionViaMetrics {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestDataNodeOutlierDetectionViaMetrics.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  // A few constants to keep the test run time short.
+  private static final int WINDOW_INTERVAL_SECONDS = 3;
+  private static final int ROLLING_AVERAGE_WINDOWS = 10;
+  private static final int SLOW_NODE_LATENCY_MS = 20_000;
+  private static final int FAST_NODE_MAX_LATENCY_MS = 5;
+
+  private Random random = new Random(System.currentTimeMillis());
+
+  @Before
+  public void setup() {
+    GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+  }
+
+  /**
+   * Test that a very slow peer is detected as an outlier.
+   */
+  @Test
+  public void testOutlierIsDetected() throws Exception {
+    final String slowNodeName = "SlowNode";
+
+    DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+        ROLLING_AVERAGE_WINDOWS);
+
+    injectFastNodesSamples(peerMetrics);
+    injectSlowNodeSamples(peerMetrics, slowNodeName);
+
+    // Trigger a snapshot.
+    peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+    final Map<String, Double> outliers = peerMetrics.getOutliers();
+    LOG.info("Got back outlier nodes: {}", outliers);
+    assertThat(outliers.size(), is(1));
+    assertTrue(outliers.containsKey(slowNodeName));
+  }
+
+  /**
+   * Test that when there are no outliers, we get back nothing.
+   */
+  @Test
+  public void testWithNoOutliers() throws Exception {
+    DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+        ROLLING_AVERAGE_WINDOWS);
+
+    injectFastNodesSamples(peerMetrics);
+
+    // Trigger a snapshot.
+    peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+    // Ensure that we get back the outlier.
+    assertTrue(peerMetrics.getOutliers().isEmpty());
+  }
+
+  /**
+   * Inject fake stats for MIN_OUTLIER_DETECTION_PEERS fast nodes.
+   *
+   * @param peerMetrics
+   */
+  public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
+    for (int nodeIndex = 0;
+         nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers();
+         ++nodeIndex) {
+      final String nodeName = "FastNode-" + nodeIndex;
+      LOG.info("Generating stats for node {}", nodeName);
+      for (int i = 0;
+           i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+           ++i) {
+        peerMetrics.addSendPacketDownstream(
+            nodeName, random.nextInt(FAST_NODE_MAX_LATENCY_MS));
+      }
+    }
+  }
+
+  /**
+   * Inject fake stats for one extremely slow node.
+   */
+  public void injectSlowNodeSamples(
+      DataNodePeerMetrics peerMetrics, String slowNodeName)
+      throws InterruptedException {
+
+    // And the one slow node.
+    for (int i = 0;
+         i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+         ++i) {
+      peerMetrics.addSendPacketDownstream(
+          slowNodeName, SLOW_NODE_LATENCY_MS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
new file mode 100644
index 0000000..6107d63
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link SlowNodeDetector}.
+ */
+public class TestSlowNodeDetector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestSlowNodeDetector.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  private final static double LOW_THRESHOLD = 1000;
+  private final static long MIN_OUTLIER_DETECTION_PEERS = 3;
+
+  // Randomly generated test cases for median and MAD. The first entry
+  // in each pair is the expected median and the second entry is the
+  // expected Median Absolute Deviation. The small sets of size 1 and 2
+  // exist to test the edge cases however in practice the MAD of a very
+  // small set is not useful.
+  private Map<List<Double>, Pair<Double, Double>> medianTestMatrix =
+      new ImmutableMap.Builder<List<Double>, Pair<Double, Double>>()
+          // Single element.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(9.6502431302).build(),
+              Pair.of(9.6502431302, 0.0))
+
+          // Two elements.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(1.72168104625)
+                  .add(11.7872544459).build(),
+              Pair.of(6.75446774606, 7.4616095611))
+
+          // The Remaining lists were randomly generated with sizes 3-10.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(76.2635686249)
+                  .add(27.0652018553)
+                  .add(1.3868476443)
+                  .add(49.7194624164)
+                  .add(47.385680883)
+                  .add(57.8721199173).build(),
+              Pair.of(48.5525716497, 22.837202532))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(86.0573389581)
+                  .add(93.2399572424)
+                  .add(64.9545429122)
+                  .add(35.8509730085)
+                  .add(1.6534313654).build(),
+              Pair.of(64.9545429122, 41.9360180373))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(5.00127007366)
+                  .add(37.9790589127)
+                  .add(67.5784746266).build(),
+              Pair.of(37.9790589127, 43.8841594039))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(1.43442932944)
+                  .add(70.6769829947)
+                  .add(37.47579656)
+                  .add(51.1126141394)
+                  .add(72.2465914419)
+                  .add(32.2930549225)
+                  .add(39.677459781).build(),
+              Pair.of(39.677459781, 16.9537852208))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(26.7913745214)
+                  .add(68.9833706658)
+                  .add(29.3882180746)
+                  .add(68.3455244453)
+                  .add(74.9277265022)
+                  .add(12.1469972942)
+                  .add(72.5395402683)
+                  .add(7.87917492506)
+                  .add(33.3253447774)
+                  .add(72.2753759125).build(),
+              Pair.of(50.8354346113, 31.9881230079))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(38.6482290705)
+                  .add(88.0690746319)
+                  .add(50.6673611649)
+                  .add(64.5329814115)
+                  .add(25.2580979294)
+                  .add(59.6709630711)
+                  .add(71.5406993741)
+                  .add(81.3073035091)
+                  .add(20.5549547284).build(),
+              Pair.of(59.6709630711, 31.1683520683))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(87.352734249)
+                  .add(65.4760359094)
+                  .add(28.9206803169)
+                  .add(36.5908574008)
+                  .add(87.7407653175)
+                  .add(99.3704511335)
+                  .add(41.3227434076)
+                  .add(46.2713494909)
+                  .add(3.49940920921).build(),
+              Pair.of(46.2713494909, 28.4729106898))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(95.3251533286)
+                  .add(27.2777870437)
+                  .add(43.73477168).build(),
+              Pair.of(43.73477168, 24.3991619317))
+
+          .build();
+
+  // A test matrix that maps inputs to the expected output list of
+  // slow nodes i.e. outliers.
+  private Map<Map<String, Double>, Set<String>> outlierTestMatrix =
+      new ImmutableMap.Builder<Map<String, Double>, Set<String>>()
+          // The number of samples is too low and all samples are below
+          // the low threshold. Nothing should be returned.
+          .put(ImmutableMap.of(
+              "n1", 0.0,
+              "n2", LOW_THRESHOLD + 1),
+              ImmutableSet.<String>of())
+
+          // A statistical outlier below the low threshold must not be
+          // returned.
+          .put(ImmutableMap.of(
+              "n1", 1.0,
+              "n2", 1.0,
+              "n3", LOW_THRESHOLD - 1),
+              ImmutableSet.<String>of())
+
+          // A statistical outlier above the low threshold must be returned.
+          .put(ImmutableMap.of(
+              "n1", 1.0,
+              "n2", 1.0,
+              "n3", LOW_THRESHOLD + 1),
+              ImmutableSet.of("n3"))
+
+          // A statistical outlier must not be returned if it is within a
+          // MEDIAN_MULTIPLIER multiple of the median.
+          .put(ImmutableMap.of(
+              "n1", LOW_THRESHOLD + 0.1,
+              "n2", LOW_THRESHOLD + 0.1,
+              "n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1),
+              ImmutableSet.<String>of())
+
+          // A statistical outlier must be returned if it is outside a
+          // MEDIAN_MULTIPLIER multiple of the median.
+          .put(ImmutableMap.of(
+              "n1", LOW_THRESHOLD + 0.1,
+              "n2", LOW_THRESHOLD + 0.1,
+              "n3", (LOW_THRESHOLD + 0.1) *
+                  SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1),
+              ImmutableSet.of("n3"))
+
+          // Only the statistical outliers n3 and n11 should be returned.
+          .put(new ImmutableMap.Builder<String, Double>()
+                  .put("n1", 1029.4322)
+                  .put("n2", 2647.876)
+                  .put("n3", 9194.312)
+                  .put("n4", 2.2)
+                  .put("n5", 2012.92)
+                  .put("n6", 1843.81)
+                  .put("n7", 1201.43)
+                  .put("n8", 6712.01)
+                  .put("n9", 3278.554)
+                  .put("n10", 2091.765)
+                  .put("n11", 9194.77).build(),
+              ImmutableSet.of("n3", "n11"))
+
+          // The following input set has multiple outliers.
+          //   - The low outliers (n4, n6) should not be returned.
+          //   - High outlier n2 is within 3 multiples of the median
+          //     and so it should not be returned.
+          //   - Only the high outlier n8 should be returned.
+          .put(new ImmutableMap.Builder<String, Double>()
+                  .put("n1", 5002.0)
+                  .put("n2", 9001.0)
+                  .put("n3", 5004.0)
+                  .put("n4", 1001.0)
+                  .put("n5", 5003.0)
+                  .put("n6", 2001.0)
+                  .put("n7", 5000.0)
+                  .put("n8", 101002.0)
+                  .put("n9", 5001.0)
+                  .put("n10", 5002.0)
+                  .put("n11", 5105.0)
+                  .put("n12", 5006.0).build(),
+              ImmutableSet.of("n8"))
+
+          .build();
+
+
+  private SlowNodeDetector slowNodeDetector;
+
+  @Before
+  public void setup() {
+    slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD);
+    SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS);
+    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+  }
+
+  @Test
+  public void testOutliersFromTestMatrix() {
+    for (Map.Entry<Map<String, Double>, Set<String>> entry :
+        outlierTestMatrix.entrySet()) {
+
+      LOG.info("Verifying set {}", entry.getKey());
+      final Set<String> outliers =
+          slowNodeDetector.getOutliers(entry.getKey()).keySet();
+      assertTrue(
+          "Running outlier detection on " + entry.getKey() +
+              " was expected to yield set " + entry.getValue() + ", but " +
+              " we got set " + outliers,
+          outliers.equals(entry.getValue()));
+    }
+  }
+
+  /**
+   * Unit test for {@link SlowNodeDetector#computeMedian(List)}.
+   */
+  @Test
+  public void testMediansFromTestMatrix() {
+    for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
+        medianTestMatrix.entrySet()) {
+      final List<Double> inputList = new ArrayList<>(entry.getKey());
+      Collections.sort(inputList);
+      final Double median = SlowNodeDetector.computeMedian(inputList);
+      final Double expectedMedian = entry.getValue().getLeft();
+
+      // Ensure that the median is within 0.001% of expected.
+      // We need some fudge factor for floating point comparison.
+      final Double errorPercent =
+          Math.abs(median - expectedMedian) * 100.0 / expectedMedian;
+
+      assertTrue(
+          "Set " + inputList + "; Expected median: " +
+              expectedMedian + ", got: " + median,
+          errorPercent < 0.001);
+    }
+  }
+
+  /**
+   * Unit test for {@link SlowNodeDetector#computeMad(List)}.
+   */
+  @Test
+  public void testMadsFromTestMatrix() {
+    for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
+        medianTestMatrix.entrySet()) {
+      final List<Double> inputList = new ArrayList<>(entry.getKey());
+      Collections.sort(inputList);
+      final Double mad = SlowNodeDetector.computeMad(inputList);
+      final Double expectedMad = entry.getValue().getRight();
+
+      // Ensure that the MAD is within 0.001% of expected.
+      // We need some fudge factor for floating point comparison.
+      if (entry.getKey().size() > 1) {
+        final Double errorPercent =
+            Math.abs(mad - expectedMad) * 100.0 / expectedMad;
+
+        assertTrue(
+            "Set " + entry.getKey() + "; Expected M.A.D.: " +
+                expectedMad + ", got: " + mad,
+            errorPercent < 0.001);
+      } else {
+        // For an input list of size 1, the MAD should be 0.0.
+        final Double epsilon = 0.000001; // Allow for some FP math error.
+        assertTrue(
+            "Set " + entry.getKey() + "; Expected M.A.D.: " +
+                expectedMad + ", got: " + mad,
+            mad < epsilon);
+      }
+    }
+  }
+
+  /**
+   * Verify that {@link SlowNodeDetector#computeMedian(List)} throws when
+   * passed an empty list.
+   */
+  @Test(expected=IllegalArgumentException.class)
+  public void testMedianOfEmptyList() {
+    SlowNodeDetector.computeMedian(Collections.<Double>emptyList());
+  }
+
+  /**
+   * Verify that {@link SlowNodeDetector#computeMad(List)} throws when
+   * passed an empty list.
+   */
+  @Test(expected=IllegalArgumentException.class)
+  public void testMadOfEmptyList() {
+    SlowNodeDetector.computeMedian(Collections.<Double>emptyList());
+  }
+  
+  private static class Pair<L, R> {
+    private final L l;
+    private final R r;
+
+    Pair(L l, R r) {
+      this.l = l;
+      this.r = r;
+    }
+
+    L getLeft() {
+      return l;
+    }
+
+    R getRight() {
+      return r;
+    }
+    
+    static <L, R> Pair of(L l, R r) {
+      return new Pair<>(l, r);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index a93ec39..76ccd40 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -64,6 +64,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -951,7 +952,8 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
-          0L, 0L, 0, 0, 0, null, true).getCommands();
+          0L, 0L, 0, 0, 0, null, true,
+          SlowPeerReports.EMPTY_REPORT).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1000,7 +1002,8 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
-          rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
+          rep, 0L, 0L, 0, 0, 0, null, true,
+          SlowPeerReports.EMPTY_REPORT).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index e49e62b..0810276 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
@@ -117,7 +118,8 @@ public class NameNodeAdapter {
       DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
-        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true);
+        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
+        SlowPeerReports.EMPTY_REPORT);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 9b6e874..a72b2a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -45,6 +45,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -131,7 +132,8 @@ public class TestDeadDatanode {
         new DatanodeStorage(reg.getDatanodeUuid()),
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
-        dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
+        dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
+            SlowPeerReports.EMPTY_REPORT).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index da10878..da5ef4d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -68,6 +68,10 @@ public class TestHdfsConfigFields extends 
TestConfigurationFieldsBase {
     // Purposely hidden, based on comments in DFSConfigKeys
     configurationPropsToSkipCompare
         .add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY);
+    configurationPropsToSkipCompare
+        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY);
+    configurationPropsToSkipCompare
+        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY);
 
     // Fully deprecated properties?
     configurationPropsToSkipCompare


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to