Repository: hadoop Updated Branches: refs/heads/branch-2 8027c3e8b -> b4078e1d0
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 63f11b7..3280563 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntryScope; import org.apache.hadoop.fs.permission.AclEntryType; @@ -81,6 +82,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; @@ -165,7 +167,7 @@ public class TestPBHelper { DatanodeID dn2 = PBHelperClient.convert(dnProto); compare(dn, dn2); } - + void compare(DatanodeID dn, DatanodeID dn2) { assertEquals(dn.getIpAddr(), dn2.getIpAddr()); assertEquals(dn.getHostName(), dn2.getHostName()); @@ -253,7 +255,7 @@ public class TestPBHelper { ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto); compare(expKeys, expKeys1); } - + void compare(ExportedBlockKeys expKeys, ExportedBlockKeys expKeys1) { BlockKey[] allKeys = expKeys.getAllKeys(); BlockKey[] allKeys1 = expKeys1.getAllKeys(); @@ -282,12 +284,12 @@ public class TestPBHelper { s1.getMostRecentCheckpointTxId()); assertEquals(s.getNamespaceID(), s1.getNamespaceID()); } - + private static void compare(RemoteEditLog l1, RemoteEditLog l2) { assertEquals(l1.getEndTxId(), l2.getEndTxId()); assertEquals(l1.getStartTxId(), l2.getStartTxId()); } - + @Test public void testConvertRemoteEditLog() { RemoteEditLog l = new RemoteEditLog(1, 100); @@ -295,7 +297,7 @@ public class TestPBHelper { RemoteEditLog l1 = PBHelper.convert(lProto); compare(l, l1); } - + @Test public void testConvertRemoteEditLogManifest() { List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>(); @@ -304,7 +306,7 @@ public class TestPBHelper { RemoteEditLogManifest m = new RemoteEditLogManifest(logs); RemoteEditLogManifestProto mProto = PBHelper.convert(m); RemoteEditLogManifest m1 = PBHelper.convert(mProto); - + List<RemoteEditLog> logs1 = m1.getLogs(); assertEquals(logs.size(), logs1.size()); for (int i = 0; i < logs.size(); i++) { @@ -314,15 +316,15 @@ public class TestPBHelper { public ExtendedBlock getExtendedBlock() { return getExtendedBlock(1); } - + public ExtendedBlock getExtendedBlock(long blkid) { return new ExtendedBlock("bpid", blkid, 100, 2); } - + private void compare(DatanodeInfo dn1, DatanodeInfo dn2) { assertEquals(dn1.getAdminState(), dn2.getAdminState()); assertEquals(dn1.getBlockPoolUsed(), dn2.getBlockPoolUsed()); - assertEquals(dn1.getBlockPoolUsedPercent(), + assertEquals(dn1.getBlockPoolUsedPercent(), dn2.getBlockPoolUsedPercent(), DELTA); assertEquals(dn1.getCapacity(), dn2.getCapacity()); assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport()); @@ -336,20 +338,20 @@ public class TestPBHelper { assertEquals(dn1.getLevel(), dn2.getLevel()); assertEquals(dn1.getNetworkLocation(), dn2.getNetworkLocation()); } - + @Test public void testConvertExtendedBlock() { ExtendedBlock b = getExtendedBlock(); ExtendedBlockProto bProto = PBHelperClient.convert(b); ExtendedBlock b1 = PBHelperClient.convert(bProto); assertEquals(b, b1); - + b.setBlockId(-1); bProto = PBHelperClient.convert(b); b1 = PBHelperClient.convert(bProto); assertEquals(b, b1); } - + @Test public void testConvertRecoveringBlock() { DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo(); @@ -365,7 +367,7 @@ public class TestPBHelper { compare(dnInfo[0], dnInfo1[0]); } } - + @Test public void testConvertBlockRecoveryCommand() { DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo(); @@ -376,14 +378,14 @@ public class TestPBHelper { new RecoveringBlock(getExtendedBlock(1), dnInfo, 3), new RecoveringBlock(getExtendedBlock(2), dnInfo, 3) ); - + BlockRecoveryCommand cmd = new BlockRecoveryCommand(blks); BlockRecoveryCommandProto proto = PBHelper.convert(cmd); assertEquals(1, proto.getBlocks(0).getBlock().getB().getBlockId()); assertEquals(2, proto.getBlocks(1).getBlock().getB().getBlockId()); - + BlockRecoveryCommand cmd2 = PBHelper.convert(proto); - + List<RecoveringBlock> cmd2Blks = Lists.newArrayList( cmd2.getRecoveringBlocks()); assertEquals(blks.get(0).getBlock(), cmd2Blks.get(0).getBlock()); @@ -391,8 +393,8 @@ public class TestPBHelper { assertEquals(Joiner.on(",").join(blks), Joiner.on(",").join(cmd2Blks)); assertEquals(cmd.toString(), cmd2.toString()); } - - + + @Test public void testConvertText() { Text t = new Text("abc".getBytes()); @@ -400,7 +402,7 @@ public class TestPBHelper { Text t1 = new Text(s); assertEquals(t, t1); } - + @Test public void testConvertBlockToken() { Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>( @@ -410,7 +412,7 @@ public class TestPBHelper { Token<BlockTokenIdentifier> token2 = PBHelperClient.convert(tokenProto); compare(token, token2); } - + @Test public void testConvertNamespaceInfo() { NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300); @@ -455,7 +457,7 @@ public class TestPBHelper { AdminStates.DECOMMISSION_INPROGRESS), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2", AdminStates.DECOMMISSIONED), - DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", + DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", AdminStates.NORMAL), DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4", AdminStates.NORMAL), @@ -523,7 +525,7 @@ public class TestPBHelper { compare(lbl.get(i), lbl2.get(2)); } } - + @Test public void testConvertLocatedBlockArray() { LocatedBlock [] lbl = new LocatedBlock[3]; @@ -563,7 +565,7 @@ public class TestPBHelper { DatanodeStorage dns2 = PBHelperClient.convert(proto); compare(dns1, dns2); } - + @Test public void testConvertBlockCommand() { Block[] blocks = new Block[] { new Block(21), new Block(22) }; @@ -596,7 +598,7 @@ public class TestPBHelper { } } } - + @Test public void testChecksumTypeProto() { assertEquals(DataChecksum.Type.NULL, @@ -678,4 +680,24 @@ public class TestPBHelper { DatanodeInfo dnInfos3 = PBHelperClient.convert(b.build()); assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed()); } + + @Test + public void testSlowPeerInfoPBHelper() { + // Test with a map that has a few slow peer entries. + final SlowPeerReports slowPeers = SlowPeerReports.create( + ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0)); + SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo( + PBHelper.convertSlowPeerInfo(slowPeers)); + assertTrue( + "Expected map:" + slowPeers + ", got map:" + + slowPeersConverted1.getSlowPeers(), + slowPeersConverted1.equals(slowPeers)); + + // Test with an empty map. + SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo( + PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT)); + assertTrue( + "Expected empty map:" + ", got map:" + slowPeersConverted2, + slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index ab607ea..f12f6f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -42,13 +42,23 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import org.mockito.Mockito; /** * Test if FSNamesystem handles heartbeat right */ public class TestHeartbeatHandling { + + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + /** * Test if * {@link FSNamesystem#handleHeartbeat} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index 5214af3..fbfccbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; import java.util.ArrayList; import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -111,7 +113,7 @@ public class TestNameNodePrunesMissingStorages { // Stop the DataNode and send fake heartbeat with missing storage. cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, - 0, null, true); + 0, null, true, SlowPeerReports.EMPTY_REPORT); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java new file mode 100644 index 0000000..15eb3a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.ReportForJson; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Set; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + + +/** + * Tests for {@link SlowPeerTracker}. + */ +public class TestSlowPeerTracker { + public static final Logger LOG = LoggerFactory.getLogger( + TestSlowPeerTracker.class); + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + + private Configuration conf; + private SlowPeerTracker tracker; + private FakeTimer timer; + private long reportValidityMs; + + @Before + public void setup() { + conf = new HdfsConfiguration(); + timer = new FakeTimer(); + tracker = new SlowPeerTracker(conf, timer); + reportValidityMs = tracker.getReportValidityMs(); + } + + /** + * Edge case, there are no reports to retrieve. + */ + @Test + public void testEmptyReports() { + assertTrue(tracker.getReportsForAllDataNodes().isEmpty()); + assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty()); + } + + @Test + public void testReportsAreRetrieved() { + tracker.addReport("node2", "node1"); + tracker.addReport("node3", "node1"); + tracker.addReport("node3", "node2"); + + assertThat(tracker.getReportsForAllDataNodes().size(), is(2)); + assertThat(tracker.getReportsForNode("node2").size(), is(1)); + assertThat(tracker.getReportsForNode("node3").size(), is(2)); + assertThat(tracker.getReportsForNode("node1").size(), is(0)); + } + + /** + * Test that when all reports are expired, we get back nothing. + */ + @Test + public void testAllReportsAreExpired() { + tracker.addReport("node2", "node1"); + tracker.addReport("node3", "node2"); + tracker.addReport("node1", "node3"); + + // No reports should expire after 1ms. + timer.advance(1); + assertThat(tracker.getReportsForAllDataNodes().size(), is(3)); + + // All reports should expire after REPORT_VALIDITY_MS. + timer.advance(reportValidityMs); + assertTrue(tracker.getReportsForAllDataNodes().isEmpty()); + assertTrue(tracker.getReportsForNode("node1").isEmpty()); + assertTrue(tracker.getReportsForNode("node2").isEmpty()); + assertTrue(tracker.getReportsForNode("node3").isEmpty()); + } + + /** + * Test the case when a subset of reports has expired. + * Ensure that we only get back non-expired reports. + */ + @Test + public void testSomeReportsAreExpired() { + tracker.addReport("node3", "node1"); + tracker.addReport("node3", "node2"); + timer.advance(reportValidityMs); + tracker.addReport("node3", "node4"); + assertThat(tracker.getReportsForAllDataNodes().size(), is(1)); + assertThat(tracker.getReportsForNode("node3").size(), is(1)); + assertTrue(tracker.getReportsForNode("node3").contains("node4")); + } + + /** + * Test the case when an expired report is replaced by a valid one. + */ + @Test + public void testReplacement() { + tracker.addReport("node2", "node1"); + timer.advance(reportValidityMs); // Expire the report. + assertThat(tracker.getReportsForAllDataNodes().size(), is(0)); + + // This should replace the expired report with a newer valid one. + tracker.addReport("node2", "node1"); + assertThat(tracker.getReportsForAllDataNodes().size(), is(1)); + assertThat(tracker.getReportsForNode("node2").size(), is(1)); + } + + @Test + public void testGetJson() throws IOException { + tracker.addReport("node1", "node2"); + tracker.addReport("node2", "node3"); + tracker.addReport("node2", "node1"); + tracker.addReport("node4", "node1"); + + final Set<ReportForJson> reports = getAndDeserializeJson(); + + // And ensure its contents are what we expect. + assertThat(reports.size(), is(3)); + assertTrue(isNodeInReports(reports, "node1")); + assertTrue(isNodeInReports(reports, "node2")); + assertTrue(isNodeInReports(reports, "node4")); + + assertFalse(isNodeInReports(reports, "node3")); + } + + @Test + public void testGetJsonSizeIsLimited() throws IOException { + tracker.addReport("node1", "node2"); + tracker.addReport("node1", "node3"); + tracker.addReport("node2", "node3"); + tracker.addReport("node2", "node4"); + tracker.addReport("node3", "node4"); + tracker.addReport("node3", "node5"); + tracker.addReport("node4", "node6"); + tracker.addReport("node5", "node6"); + tracker.addReport("node5", "node7"); + tracker.addReport("node6", "node7"); + tracker.addReport("node6", "node8"); + + final Set<ReportForJson> reports = getAndDeserializeJson(); + + // Ensure that node4 is not in the list since it was + // tagged by just one peer and we already have 5 other nodes. + assertFalse(isNodeInReports(reports, "node4")); + + // Remaining nodes should be in the list. + assertTrue(isNodeInReports(reports, "node1")); + assertTrue(isNodeInReports(reports, "node2")); + assertTrue(isNodeInReports(reports, "node3")); + assertTrue(isNodeInReports(reports, "node5")); + assertTrue(isNodeInReports(reports, "node6")); + } + + @Test + public void testLowRankedElementsIgnored() throws IOException { + // Insert 5 nodes with 2 peer reports each. + for (int i = 0; i < 5; ++i) { + tracker.addReport("node" + i, "reporter1"); + tracker.addReport("node" + i, "reporter2"); + } + + // Insert 10 nodes with 1 peer report each. + for (int i = 10; i < 20; ++i) { + tracker.addReport("node" + i, "reporter1"); + } + + final Set<ReportForJson> reports = getAndDeserializeJson(); + + // Ensure that only the first 5 nodes with two reports each were + // included in the JSON. + for (int i = 0; i < 5; ++i) { + assertTrue(isNodeInReports(reports, "node" + i)); + } + } + + private boolean isNodeInReports( + Set<ReportForJson> reports, String node) { + for (ReportForJson report : reports) { + if (report.getSlowNode().equalsIgnoreCase(node)) { + return true; + } + } + return false; + } + + private Set<ReportForJson> getAndDeserializeJson() + throws IOException { + final String json = tracker.getJson(); + LOG.info("Got JSON: {}", json); + return (new ObjectMapper()).readValue( + json, new TypeReference<Set<ReportForJson>>() {}); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java index de856e6..cf43fd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.junit.Assert; @@ -136,7 +137,8 @@ public class InternalDataNodeTestUtils { Mockito.any(StorageReport[].class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean())).thenReturn( + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class))).thenReturn( new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() .nextLong() | 1L)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index b7b8966..c6b38ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -119,7 +120,7 @@ public class TestBPOfferService { Mockito.doReturn(conf).when(mockDn).getConf(); Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")) - .when(mockDn).getMetrics(); + .when(mockDn).getMetrics(); // Set up a simulated dataset with our fake BP mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf)); @@ -152,7 +153,8 @@ public class TestBPOfferService { Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean()); + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class)); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 346250b..fecddc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; @@ -83,6 +84,7 @@ import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; @@ -186,7 +188,8 @@ public class TestBlockRecovery { Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean())) + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class))) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), @@ -252,15 +255,15 @@ public class TestBlockRecovery { } /** Sync two replicas */ - private void testSyncReplicas(ReplicaRecoveryInfo replica1, + private void testSyncReplicas(ReplicaRecoveryInfo replica1, ReplicaRecoveryInfo replica2, InterDatanodeProtocol dn1, InterDatanodeProtocol dn2, long expectLen) throws IOException { - + DatanodeInfo[] locs = new DatanodeInfo[]{ mock(DatanodeInfo.class), mock(DatanodeInfo.class)}; - RecoveringBlock rBlock = new RecoveringBlock(block, + RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2); BlockRecord record1 = new BlockRecord( @@ -269,7 +272,7 @@ public class TestBlockRecovery { DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2); syncList.add(record1); syncList.add(record2); - + when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), anyLong(), anyLong())).thenReturn("storage1"); when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), @@ -279,7 +282,7 @@ public class TestBlockRecovery { recoveryWorker.new RecoveryTaskContiguous(rBlock); RecoveryTaskContiguous.syncBlock(syncList); } - + /** * BlockRecovery_02.8. * Two replicas are in Finalized state @@ -290,9 +293,9 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); @@ -305,9 +308,9 @@ public class TestBlockRecovery { REPLICA_LEN1); // two finalized replicas have different length - replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED); try { @@ -318,10 +321,10 @@ public class TestBlockRecovery { "Inconsistent size of finalized replicas. ")); } } - + /** * BlockRecovery_02.9. - * One replica is Finalized and another is RBW. + * One replica is Finalized and another is RBW. * @throws IOException in case of an error */ @Test(timeout=60000) @@ -329,11 +332,11 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - + // rbw and finalized replicas have the same length - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); @@ -344,11 +347,11 @@ public class TestBlockRecovery { REPLICA_LEN1); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); - + // rbw replica has a different length from the finalized one - replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); dn1 = mock(InterDatanodeProtocol.class); @@ -359,10 +362,10 @@ public class TestBlockRecovery { verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } - + /** * BlockRecovery_02.10. - * One replica is Finalized and another is RWR. + * One replica is Finalized and another is RWR. * @throws IOException in case of an error */ @Test(timeout=60000) @@ -370,11 +373,11 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - + // rbw and finalized replicas have the same length - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); @@ -385,11 +388,11 @@ public class TestBlockRecovery { REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); - + // rbw replica has a different length from the finalized one - replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); dn1 = mock(InterDatanodeProtocol.class); @@ -401,7 +404,7 @@ public class TestBlockRecovery { verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } - + /** * BlockRecovery_02.11. * Two replicas are RBW. @@ -412,9 +415,9 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); @@ -425,10 +428,10 @@ public class TestBlockRecovery { verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); } - + /** * BlockRecovery_02.12. - * One replica is RBW and another is RWR. + * One replica is RBW and another is RWR. * @throws IOException in case of an error */ @Test(timeout=60000) @@ -436,9 +439,9 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); @@ -450,9 +453,9 @@ public class TestBlockRecovery { verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } - + /** - * BlockRecovery_02.13. + * BlockRecovery_02.13. * Two replicas are RWR. * @throws IOException in case of an error */ @@ -461,9 +464,9 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); @@ -471,10 +474,10 @@ public class TestBlockRecovery { long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); testSyncReplicas(replica1, replica2, dn1, dn2, minLen); - + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); - } + } private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException { Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1); @@ -661,10 +664,10 @@ public class TestBlockRecovery { streams.close(); } } - + /** * Test to verify the race between finalizeBlock and Lease recovery - * + * * @throws Exception */ @Test(timeout = 20000) @@ -682,11 +685,11 @@ public class TestBlockRecovery { FSDataOutputStream out = fs.create(path); out.writeBytes("data"); out.hsync(); - + List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path)); final LocatedBlock block = blocks.get(0); final DataNode dataNode = cluster.getDataNodes().get(0); - + final AtomicBoolean recoveryInitResult = new AtomicBoolean(true); Thread recoveryThread = new Thread() { @Override @@ -716,7 +719,7 @@ public class TestBlockRecovery { } Assert.assertTrue("Recovery should be initiated successfully", recoveryInitResult.get()); - + dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() .getGenerationStamp() + 1, block.getBlock().getBlockId(), block.getBlockSize()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java index 76885e4..6435d4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java @@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + import static java.lang.Math.abs; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertFalse; @@ -31,11 +36,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler; - -import java.util.Arrays; -import java.util.List; -import java.util.Random; /** @@ -51,6 +51,7 @@ public class TestBpServiceActorScheduler { private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS; private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds + private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000; // 10 seconds private final Random random = new Random(System.nanoTime()); @Test @@ -180,13 +181,28 @@ public class TestBpServiceActorScheduler { } } + @Test + public void testSlowPeerReportScheduling() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + assertTrue(scheduler.isSlowPeersReportDue(now)); + scheduler.scheduleNextSlowPeerReport(); + assertFalse(scheduler.isSlowPeersReportDue(now)); + assertFalse(scheduler.isSlowPeersReportDue(now + 1)); + assertTrue(scheduler.isSlowPeersReportDue( + now + SLOW_PEER_REPORT_INTERVAL_MS)); + } + } + private Scheduler makeMockScheduler(long now) { LOG.info("Using now = " + now); - Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, - LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS)); + Scheduler mockScheduler = spy(new Scheduler( + HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS, + BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS)); doReturn(now).when(mockScheduler).monotonicNow(); mockScheduler.nextBlockReportTime = now; mockScheduler.nextHeartbeatTime = now; + mockScheduler.nextSlowPeersReportTime = now; return mockScheduler; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index df2fe5a..8a9f0b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; @@ -167,7 +168,8 @@ public class TestDataNodeLifeline { anyInt(), anyInt(), any(VolumeFailureSummary.class), - anyBoolean()); + anyBoolean(), + any(SlowPeerReports.class)); // Intercept lifeline to trigger latch count-down on each call. doAnswer(new LatchCountingAnswer<Void>(lifelinesSent)) @@ -230,7 +232,8 @@ public class TestDataNodeLifeline { anyInt(), anyInt(), any(VolumeFailureSummary.class), - anyBoolean()); + anyBoolean(), + any(SlowPeerReports.class)); // While waiting on the latch for the expected number of heartbeat messages, // poll DataNode tracking information. We expect that the DataNode always http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java index 5af54a4..b18ff2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -41,9 +42,10 @@ public class TestDataNodePeerMetrics { final int numOpsPerIteration = 1000; final Configuration conf = new HdfsConfiguration(); - conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY, - windowSize); - conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY, + conf.setTimeDuration( + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY, + windowSize, TimeUnit.SECONDS); + conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY, numWindows); conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index d447a76..c94f74e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -218,7 +219,8 @@ public class TestDatanodeProtocolRetryPolicy { Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean()); + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class)); dn = new DataNode(conf, locations, null, null) { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 6557055..eb015c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; @@ -172,7 +173,7 @@ public class TestFsDatasetCache { (DatanodeRegistration) any(), (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), - anyBoolean()); + anyBoolean(), any(SlowPeerReports.class)); } private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index d8418d4..2b793e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.junit.After; @@ -106,7 +107,8 @@ public class TestStorageReport { any(DatanodeRegistration.class), captor.capture(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), - Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean()); + Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class)); StorageReport[] reports = captor.getValue(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java new file mode 100644 index 0000000..34e15e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Random; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + + +/** + * Test that the {@link DataNodePeerMetrics} class is able to detect + * outliers i.e. slow nodes via the metrics it maintains. + */ +public class TestDataNodeOutlierDetectionViaMetrics { + public static final Logger LOG = + LoggerFactory.getLogger(TestDataNodeOutlierDetectionViaMetrics.class); + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + + // A few constants to keep the test run time short. + private static final int WINDOW_INTERVAL_SECONDS = 3; + private static final int ROLLING_AVERAGE_WINDOWS = 10; + private static final int SLOW_NODE_LATENCY_MS = 20_000; + private static final int FAST_NODE_MAX_LATENCY_MS = 5; + + private Random random = new Random(System.currentTimeMillis()); + + @Before + public void setup() { + GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL); + } + + /** + * Test that a very slow peer is detected as an outlier. + */ + @Test + public void testOutlierIsDetected() throws Exception { + final String slowNodeName = "SlowNode"; + + DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics( + "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS, + ROLLING_AVERAGE_WINDOWS); + + injectFastNodesSamples(peerMetrics); + injectSlowNodeSamples(peerMetrics, slowNodeName); + + // Trigger a snapshot. + peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + + final Map<String, Double> outliers = peerMetrics.getOutliers(); + LOG.info("Got back outlier nodes: {}", outliers); + assertThat(outliers.size(), is(1)); + assertTrue(outliers.containsKey(slowNodeName)); + } + + /** + * Test that when there are no outliers, we get back nothing. + */ + @Test + public void testWithNoOutliers() throws Exception { + DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics( + "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS, + ROLLING_AVERAGE_WINDOWS); + + injectFastNodesSamples(peerMetrics); + + // Trigger a snapshot. + peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + + // Ensure that we get back the outlier. + assertTrue(peerMetrics.getOutliers().isEmpty()); + } + + /** + * Inject fake stats for MIN_OUTLIER_DETECTION_PEERS fast nodes. + * + * @param peerMetrics + */ + public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) { + for (int nodeIndex = 0; + nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers(); + ++nodeIndex) { + final String nodeName = "FastNode-" + nodeIndex; + LOG.info("Generating stats for node {}", nodeName); + for (int i = 0; + i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES; + ++i) { + peerMetrics.addSendPacketDownstream( + nodeName, random.nextInt(FAST_NODE_MAX_LATENCY_MS)); + } + } + } + + /** + * Inject fake stats for one extremely slow node. + */ + public void injectSlowNodeSamples( + DataNodePeerMetrics peerMetrics, String slowNodeName) + throws InterruptedException { + + // And the one slow node. + for (int i = 0; + i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES; + ++i) { + peerMetrics.addSendPacketDownstream( + slowNodeName, SLOW_NODE_LATENCY_MS); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java new file mode 100644 index 0000000..6107d63 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link SlowNodeDetector}. + */ +public class TestSlowNodeDetector { + public static final Logger LOG = + LoggerFactory.getLogger(TestSlowNodeDetector.class); + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + + private final static double LOW_THRESHOLD = 1000; + private final static long MIN_OUTLIER_DETECTION_PEERS = 3; + + // Randomly generated test cases for median and MAD. The first entry + // in each pair is the expected median and the second entry is the + // expected Median Absolute Deviation. The small sets of size 1 and 2 + // exist to test the edge cases however in practice the MAD of a very + // small set is not useful. + private Map<List<Double>, Pair<Double, Double>> medianTestMatrix = + new ImmutableMap.Builder<List<Double>, Pair<Double, Double>>() + // Single element. + .put(new ImmutableList.Builder<Double>() + .add(9.6502431302).build(), + Pair.of(9.6502431302, 0.0)) + + // Two elements. + .put(new ImmutableList.Builder<Double>() + .add(1.72168104625) + .add(11.7872544459).build(), + Pair.of(6.75446774606, 7.4616095611)) + + // The Remaining lists were randomly generated with sizes 3-10. + .put(new ImmutableList.Builder<Double>() + .add(76.2635686249) + .add(27.0652018553) + .add(1.3868476443) + .add(49.7194624164) + .add(47.385680883) + .add(57.8721199173).build(), + Pair.of(48.5525716497, 22.837202532)) + + .put(new ImmutableList.Builder<Double>() + .add(86.0573389581) + .add(93.2399572424) + .add(64.9545429122) + .add(35.8509730085) + .add(1.6534313654).build(), + Pair.of(64.9545429122, 41.9360180373)) + + .put(new ImmutableList.Builder<Double>() + .add(5.00127007366) + .add(37.9790589127) + .add(67.5784746266).build(), + Pair.of(37.9790589127, 43.8841594039)) + + .put(new ImmutableList.Builder<Double>() + .add(1.43442932944) + .add(70.6769829947) + .add(37.47579656) + .add(51.1126141394) + .add(72.2465914419) + .add(32.2930549225) + .add(39.677459781).build(), + Pair.of(39.677459781, 16.9537852208)) + + .put(new ImmutableList.Builder<Double>() + .add(26.7913745214) + .add(68.9833706658) + .add(29.3882180746) + .add(68.3455244453) + .add(74.9277265022) + .add(12.1469972942) + .add(72.5395402683) + .add(7.87917492506) + .add(33.3253447774) + .add(72.2753759125).build(), + Pair.of(50.8354346113, 31.9881230079)) + + .put(new ImmutableList.Builder<Double>() + .add(38.6482290705) + .add(88.0690746319) + .add(50.6673611649) + .add(64.5329814115) + .add(25.2580979294) + .add(59.6709630711) + .add(71.5406993741) + .add(81.3073035091) + .add(20.5549547284).build(), + Pair.of(59.6709630711, 31.1683520683)) + + .put(new ImmutableList.Builder<Double>() + .add(87.352734249) + .add(65.4760359094) + .add(28.9206803169) + .add(36.5908574008) + .add(87.7407653175) + .add(99.3704511335) + .add(41.3227434076) + .add(46.2713494909) + .add(3.49940920921).build(), + Pair.of(46.2713494909, 28.4729106898)) + + .put(new ImmutableList.Builder<Double>() + .add(95.3251533286) + .add(27.2777870437) + .add(43.73477168).build(), + Pair.of(43.73477168, 24.3991619317)) + + .build(); + + // A test matrix that maps inputs to the expected output list of + // slow nodes i.e. outliers. + private Map<Map<String, Double>, Set<String>> outlierTestMatrix = + new ImmutableMap.Builder<Map<String, Double>, Set<String>>() + // The number of samples is too low and all samples are below + // the low threshold. Nothing should be returned. + .put(ImmutableMap.of( + "n1", 0.0, + "n2", LOW_THRESHOLD + 1), + ImmutableSet.<String>of()) + + // A statistical outlier below the low threshold must not be + // returned. + .put(ImmutableMap.of( + "n1", 1.0, + "n2", 1.0, + "n3", LOW_THRESHOLD - 1), + ImmutableSet.<String>of()) + + // A statistical outlier above the low threshold must be returned. + .put(ImmutableMap.of( + "n1", 1.0, + "n2", 1.0, + "n3", LOW_THRESHOLD + 1), + ImmutableSet.of("n3")) + + // A statistical outlier must not be returned if it is within a + // MEDIAN_MULTIPLIER multiple of the median. + .put(ImmutableMap.of( + "n1", LOW_THRESHOLD + 0.1, + "n2", LOW_THRESHOLD + 0.1, + "n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1), + ImmutableSet.<String>of()) + + // A statistical outlier must be returned if it is outside a + // MEDIAN_MULTIPLIER multiple of the median. + .put(ImmutableMap.of( + "n1", LOW_THRESHOLD + 0.1, + "n2", LOW_THRESHOLD + 0.1, + "n3", (LOW_THRESHOLD + 0.1) * + SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1), + ImmutableSet.of("n3")) + + // Only the statistical outliers n3 and n11 should be returned. + .put(new ImmutableMap.Builder<String, Double>() + .put("n1", 1029.4322) + .put("n2", 2647.876) + .put("n3", 9194.312) + .put("n4", 2.2) + .put("n5", 2012.92) + .put("n6", 1843.81) + .put("n7", 1201.43) + .put("n8", 6712.01) + .put("n9", 3278.554) + .put("n10", 2091.765) + .put("n11", 9194.77).build(), + ImmutableSet.of("n3", "n11")) + + // The following input set has multiple outliers. + // - The low outliers (n4, n6) should not be returned. + // - High outlier n2 is within 3 multiples of the median + // and so it should not be returned. + // - Only the high outlier n8 should be returned. + .put(new ImmutableMap.Builder<String, Double>() + .put("n1", 5002.0) + .put("n2", 9001.0) + .put("n3", 5004.0) + .put("n4", 1001.0) + .put("n5", 5003.0) + .put("n6", 2001.0) + .put("n7", 5000.0) + .put("n8", 101002.0) + .put("n9", 5001.0) + .put("n10", 5002.0) + .put("n11", 5105.0) + .put("n12", 5006.0).build(), + ImmutableSet.of("n8")) + + .build(); + + + private SlowNodeDetector slowNodeDetector; + + @Before + public void setup() { + slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD); + SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS); + GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL); + } + + @Test + public void testOutliersFromTestMatrix() { + for (Map.Entry<Map<String, Double>, Set<String>> entry : + outlierTestMatrix.entrySet()) { + + LOG.info("Verifying set {}", entry.getKey()); + final Set<String> outliers = + slowNodeDetector.getOutliers(entry.getKey()).keySet(); + assertTrue( + "Running outlier detection on " + entry.getKey() + + " was expected to yield set " + entry.getValue() + ", but " + + " we got set " + outliers, + outliers.equals(entry.getValue())); + } + } + + /** + * Unit test for {@link SlowNodeDetector#computeMedian(List)}. + */ + @Test + public void testMediansFromTestMatrix() { + for (Map.Entry<List<Double>, Pair<Double, Double>> entry : + medianTestMatrix.entrySet()) { + final List<Double> inputList = new ArrayList<>(entry.getKey()); + Collections.sort(inputList); + final Double median = SlowNodeDetector.computeMedian(inputList); + final Double expectedMedian = entry.getValue().getLeft(); + + // Ensure that the median is within 0.001% of expected. + // We need some fudge factor for floating point comparison. + final Double errorPercent = + Math.abs(median - expectedMedian) * 100.0 / expectedMedian; + + assertTrue( + "Set " + inputList + "; Expected median: " + + expectedMedian + ", got: " + median, + errorPercent < 0.001); + } + } + + /** + * Unit test for {@link SlowNodeDetector#computeMad(List)}. + */ + @Test + public void testMadsFromTestMatrix() { + for (Map.Entry<List<Double>, Pair<Double, Double>> entry : + medianTestMatrix.entrySet()) { + final List<Double> inputList = new ArrayList<>(entry.getKey()); + Collections.sort(inputList); + final Double mad = SlowNodeDetector.computeMad(inputList); + final Double expectedMad = entry.getValue().getRight(); + + // Ensure that the MAD is within 0.001% of expected. + // We need some fudge factor for floating point comparison. + if (entry.getKey().size() > 1) { + final Double errorPercent = + Math.abs(mad - expectedMad) * 100.0 / expectedMad; + + assertTrue( + "Set " + entry.getKey() + "; Expected M.A.D.: " + + expectedMad + ", got: " + mad, + errorPercent < 0.001); + } else { + // For an input list of size 1, the MAD should be 0.0. + final Double epsilon = 0.000001; // Allow for some FP math error. + assertTrue( + "Set " + entry.getKey() + "; Expected M.A.D.: " + + expectedMad + ", got: " + mad, + mad < epsilon); + } + } + } + + /** + * Verify that {@link SlowNodeDetector#computeMedian(List)} throws when + * passed an empty list. + */ + @Test(expected=IllegalArgumentException.class) + public void testMedianOfEmptyList() { + SlowNodeDetector.computeMedian(Collections.<Double>emptyList()); + } + + /** + * Verify that {@link SlowNodeDetector#computeMad(List)} throws when + * passed an empty list. + */ + @Test(expected=IllegalArgumentException.class) + public void testMadOfEmptyList() { + SlowNodeDetector.computeMedian(Collections.<Double>emptyList()); + } + + private static class Pair<L, R> { + private final L l; + private final R r; + + Pair(L l, R r) { + this.l = l; + this.r = r; + } + + L getLeft() { + return l; + } + + R getRight() { + return r; + } + + static <L, R> Pair of(L l, R r) { + return new Pair<>(l, r); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index a93ec39..76ccd40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -951,7 +952,8 @@ public class NNThroughputBenchmark implements Tool { StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, - 0L, 0L, 0, 0, 0, null, true).getCommands(); + 0L, 0L, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1000,7 +1002,8 @@ public class NNThroughputBenchmark implements Tool { StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, - rep, 0L, 0L, 0, 0, 0, null, true).getCommands(); + rep, 0L, 0L, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index e49e62b..0810276 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; @@ -117,7 +118,8 @@ public class NameNodeAdapter { DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException { return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), - dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true); + dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT); } public static boolean setReplication(final FSNamesystem ns, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 9b6e874..a72b2a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -131,7 +132,8 @@ public class TestDeadDatanode { new DatanodeStorage(reg.getDatanodeUuid()), false, 0, 0, 0, 0, 0) }; DatanodeCommand[] cmd = - dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands(); + dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT).getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4078e1d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index da10878..da5ef4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -68,6 +68,10 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { // Purposely hidden, based on comments in DFSConfigKeys configurationPropsToSkipCompare .add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY); + configurationPropsToSkipCompare + .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY); + configurationPropsToSkipCompare + .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY); // Fully deprecated properties? configurationPropsToSkipCompare --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org