HDFS-11551. Handle SlowDiskReport from DataNode at the NameNode. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28cdc5a8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28cdc5a8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28cdc5a8 Branch: refs/heads/HDFS-7240 Commit: 28cdc5a8dc37ade1f45bda3aede589ee8593945e Parents: bf3fb58 Author: Hanisha Koneru <[email protected]> Authored: Thu Mar 30 22:41:26 2017 -0700 Committer: Arpit Agarwal <[email protected]> Committed: Thu Mar 30 22:41:26 2017 -0700 ---------------------------------------------------------------------- .../hdfs/server/protocol/SlowDiskReports.java | 28 +- .../server/blockmanagement/DatanodeManager.java | 34 +- .../server/blockmanagement/SlowDiskTracker.java | 291 ++++++++++++ .../datanode/metrics/DataNodeDiskMetrics.java | 35 +- .../blockmanagement/TestSlowDiskTracker.java | 448 +++++++++++++++++++ 5 files changed, 812 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/28cdc5a8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java index ef4d09e..8095c2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java @@ -48,7 +48,7 @@ public final class SlowDiskReports { private final Map<String, Map<DiskOp, Double>> slowDisks; /** - * An object representing a SlowPeerReports with no entries. Should + * An object representing a SlowDiskReports with no entries. Should * be used instead of null or creating new objects when there are * no slow peers to report. */ @@ -119,8 +119,28 @@ public final class SlowDiskReports { * Lists the types of operations on which disk latencies are measured. */ public enum DiskOp { - METADATA, - READ, - WRITE + METADATA("MetadataOp"), + READ("ReadIO"), + WRITE("WriteIO"); + + private final String value; + + DiskOp(final String v) { + this.value = v; + } + + @Override + public String toString() { + return value; + } + + public static DiskOp fromValue(final String value) { + for (DiskOp as : DiskOp.values()) { + if (as.value.equals(value)) { + return as; + } + } + return null; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/28cdc5a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index e22b7af..18135a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; +import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -180,9 +181,15 @@ public class DatanodeManager { * True if we should process latency metrics from downstream peers. */ private final boolean dataNodePeerStatsEnabled; + /** + * True if we should process latency metrics from individual DN disks. + */ + private final boolean dataNodeDiskStatsEnabled; @Nullable private final SlowPeerTracker slowPeerTracker; + @Nullable + private final SlowDiskTracker slowDiskTracker; /** * The minimum time between resending caching directives to Datanodes, @@ -208,9 +215,16 @@ public class DatanodeManager { this.dataNodePeerStatsEnabled = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + this.dataNodeDiskStatsEnabled = Util.isDiskStatsEnabled(conf.getDouble( + DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY, + DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_DEFAULT)); + final Timer timer = new Timer(); this.slowPeerTracker = dataNodePeerStatsEnabled ? - new SlowPeerTracker(conf, new Timer()) : null; + new SlowPeerTracker(conf, timer) : null; + + this.slowDiskTracker = dataNodeDiskStatsEnabled ? + new SlowDiskTracker(conf, timer) : null; this.defaultXferPort = NetUtils.createSocketAddr( conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, @@ -1664,6 +1678,16 @@ public class DatanodeManager { } } + if (slowDiskTracker != null) { + if (!slowDisks.getSlowDisks().isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("DataNode " + nodeReg + " reported slow disks: " + + slowDisks.getSlowDisks()); + } + slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks); + } + } + if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } @@ -1875,5 +1899,13 @@ public class DatanodeManager { public String getSlowPeersReport() { return slowPeerTracker != null ? slowPeerTracker.getJson() : null; } + + /** + * Use only for testing. + */ + @VisibleForTesting + public SlowDiskTracker getSlowDiskTracker() { + return slowDiskTracker; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/28cdc5a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java new file mode 100644 index 0000000..25920a2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.primitives.Doubles; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class aggregates information from {@link SlowDiskReports} received via + * heartbeats. + */ [email protected] [email protected] +public class SlowDiskTracker { + public static final Logger LOG = + LoggerFactory.getLogger(SlowPeerTracker.class); + + /** + * Time duration after which a report is considered stale. This is + * set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e. + * maintained for at least two successive reports. + */ + private long reportValidityMs; + + /** + * Timer object for querying the current time. Separated out for + * unit testing. + */ + private final Timer timer; + + /** + * Number of disks to include in JSON report per operation. We will return + * disks with the highest latency. + */ + private static final int MAX_DISKS_TO_REPORT = 5; + private static final String DATANODE_DISK_SEPARATOR = ":"; + private final long reportGenerationIntervalMs; + + private volatile long lastUpdateTime; + private AtomicBoolean isUpdateInProgress = new AtomicBoolean(false); + + /** + * Information about disks that have been reported as being slow. + * It is map of (Slow Disk ID) -> (DiskLatency). The DiskLatency contains + * the disk ID, the latencies reported and the timestamp when the report + * was received. + */ + private final Map<String, DiskLatency> diskIDLatencyMap; + + /** + * Map of slow disk -> diskOperations it has been reported slow in. + */ + private volatile ArrayList<DiskLatency> slowDisksReport = + Lists.newArrayList(); + private volatile ArrayList<DiskLatency> oldSlowDisksCheck; + + public SlowDiskTracker(Configuration conf, Timer timer) { + this.timer = timer; + this.lastUpdateTime = timer.monotonicNow(); + this.diskIDLatencyMap = new ConcurrentHashMap<>(); + this.reportGenerationIntervalMs = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, + DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + this.reportValidityMs = reportGenerationIntervalMs * 3; + } + + @VisibleForTesting + public static String getSlowDiskIDForReport(String datanodeID, + String slowDisk) { + return datanodeID + DATANODE_DISK_SEPARATOR + slowDisk; + } + + public void addSlowDiskReport(String dataNodeID, + SlowDiskReports dnSlowDiskReport) { + Map<String, Map<DiskOp, Double>> slowDisks = + dnSlowDiskReport.getSlowDisks(); + + long now = timer.monotonicNow(); + + for (Map.Entry<String, Map<DiskOp, Double>> slowDiskEntry : + slowDisks.entrySet()) { + + String diskID = getSlowDiskIDForReport(dataNodeID, + slowDiskEntry.getKey()); + + Map<DiskOp, Double> latencies = slowDiskEntry.getValue(); + + DiskLatency diskLatency = new DiskLatency(diskID, latencies, now); + diskIDLatencyMap.put(diskID, diskLatency); + } + + checkAndUpdateReportIfNecessary(); + } + + private void checkAndUpdateReportIfNecessary() { + // Check if it is time for update + long now = timer.monotonicNow(); + if (now - lastUpdateTime > reportGenerationIntervalMs) { + updateSlowDiskReportAsync(now); + } + } + + @VisibleForTesting + public void updateSlowDiskReportAsync(long now) { + if (isUpdateInProgress.compareAndSet(false, true)) { + lastUpdateTime = now; + new Thread(new Runnable() { + @Override + public void run() { + slowDisksReport = getSlowDisks(diskIDLatencyMap, + MAX_DISKS_TO_REPORT, now); + + cleanUpOldReports(now); + + isUpdateInProgress.set(false); + } + }).start(); + } + } + + /** + * This structure is a thin wrapper over disk latencies. + */ + public static class DiskLatency { + @JsonProperty("SlowDiskID") + final private String slowDiskID; + @JsonProperty("Latencies") + final private Map<DiskOp, Double> latencyMap; + @JsonIgnore + private long timestamp; + + /** + * Constructor needed by Jackson for Object mapping. + */ + public DiskLatency( + @JsonProperty("SlowDiskID") String slowDiskID, + @JsonProperty("Latencies") Map<DiskOp, Double> latencyMap) { + this.slowDiskID = slowDiskID; + this.latencyMap = latencyMap; + } + + public DiskLatency(String slowDiskID, Map<DiskOp, Double> latencyMap, + long timestamp) { + this.slowDiskID = slowDiskID; + this.latencyMap = latencyMap; + this.timestamp = timestamp; + } + + String getSlowDiskID() { + return this.slowDiskID; + } + + double getMaxLatency() { + double maxLatency = 0; + for (double latency : latencyMap.values()) { + if (latency > maxLatency) { + maxLatency = latency; + } + } + return maxLatency; + } + + Double getLatency(DiskOp op) { + return this.latencyMap.get(op); + } + } + + /** + * Retrieve a list of stop low disks i.e disks with the highest max latencies. + * @param numDisks number of disks to return. This is to limit the size of + * the generated JSON. + */ + private ArrayList<DiskLatency> getSlowDisks( + Map<String, DiskLatency> reports, int numDisks, long now) { + if (reports.isEmpty()) { + return new ArrayList(ImmutableList.of()); + } + + final PriorityQueue<DiskLatency> topNReports = new PriorityQueue<>( + reports.size(), + new Comparator<DiskLatency>() { + @Override + public int compare(DiskLatency o1, DiskLatency o2) { + return Doubles.compare( + o1.getMaxLatency(), o2.getMaxLatency()); + } + }); + + ArrayList<DiskLatency> oldSlowDiskIDs = Lists.newArrayList(); + + for (Map.Entry<String, DiskLatency> entry : reports.entrySet()) { + DiskLatency diskLatency = entry.getValue(); + if (now - diskLatency.timestamp < reportValidityMs) { + if (topNReports.size() < numDisks) { + topNReports.add(diskLatency); + } else if (topNReports.peek().getMaxLatency() < + diskLatency.getMaxLatency()) { + topNReports.poll(); + topNReports.add(diskLatency); + } + } else { + oldSlowDiskIDs.add(diskLatency); + } + } + + oldSlowDisksCheck = oldSlowDiskIDs; + + return Lists.newArrayList(topNReports); + } + + /** + * Retrieve all valid reports as a JSON string. + * @return serialized representation of valid reports. null if + * serialization failed. + */ + public String getSlowDiskReportAsJsonString() { + ObjectMapper objectMapper = new ObjectMapper(); + try { + return objectMapper.writeValueAsString(slowDisksReport); + } catch (JsonProcessingException e) { + // Failed to serialize. Don't log the exception call stack. + LOG.debug("Failed to serialize statistics" + e); + return null; + } + } + + private void cleanUpOldReports(long now) { + if (oldSlowDisksCheck != null) { + for (DiskLatency oldDiskLatency : oldSlowDisksCheck) { + diskIDLatencyMap.remove(oldDiskLatency.getSlowDiskID(), oldDiskLatency); + } + } + // Replace oldSlowDiskIDsCheck with an empty ArrayList + oldSlowDisksCheck = null; + } + + @VisibleForTesting + ArrayList<DiskLatency> getSlowDisksReport() { + return this.slowDisksReport; + } + + @VisibleForTesting + long getReportValidityMs() { + return reportValidityMs; + } + + @VisibleForTesting + void setReportValidityMs(long reportValidityMs) { + this.reportValidityMs = reportValidityMs; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/28cdc5a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index 2602b01..f2954e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.metrics; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -33,9 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Set; /** * This class detects and maintains DataNode disk outliers and their @@ -122,43 +121,41 @@ public class DataNodeDiskMetrics { private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats, Map<String, Double> readIoStats, Map<String, Double> writeIoStats) { - Set<String> diskOutliersSet = Sets.newHashSet(); + Map<String, Map<DiskOp, Double>> diskStats = Maps.newHashMap(); // Get MetadataOp Outliers Map<String, Double> metadataOpOutliers = slowDiskDetector .getOutliers(metadataOpStats); - if (!metadataOpOutliers.isEmpty()) { - diskOutliersSet.addAll(metadataOpOutliers.keySet()); + for (Map.Entry<String, Double> entry : metadataOpOutliers.entrySet()) { + addDiskStat(diskStats, entry.getKey(), DiskOp.METADATA, entry.getValue()); } // Get ReadIo Outliers Map<String, Double> readIoOutliers = slowDiskDetector .getOutliers(readIoStats); - if (!readIoOutliers.isEmpty()) { - diskOutliersSet.addAll(readIoOutliers.keySet()); + for (Map.Entry<String, Double> entry : readIoOutliers.entrySet()) { + addDiskStat(diskStats, entry.getKey(), DiskOp.READ, entry.getValue()); } // Get WriteIo Outliers Map<String, Double> writeIoOutliers = slowDiskDetector .getOutliers(writeIoStats); - if (!readIoOutliers.isEmpty()) { - diskOutliersSet.addAll(writeIoOutliers.keySet()); - } - - Map<String, Map<DiskOp, Double>> diskStats = - Maps.newHashMap(); - for (String disk : diskOutliersSet) { - Map<DiskOp, Double> diskStat = Maps.newHashMap(); - diskStat.put(DiskOp.METADATA, metadataOpStats.get(disk)); - diskStat.put(DiskOp.READ, readIoStats.get(disk)); - diskStat.put(DiskOp.WRITE, writeIoStats.get(disk)); - diskStats.put(disk, diskStat); + for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) { + addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue()); } diskOutliersStats = diskStats; LOG.debug("Updated disk outliers."); } + private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats, + String disk, DiskOp diskOp, double latency) { + if (!diskStats.containsKey(disk)) { + diskStats.put(disk, new HashMap<>()); + } + diskStats.get(disk).put(diskOp, latency); + } + public Map<String, Map<DiskOp, Double>> getDiskOutliersStats() { return diskOutliersStats; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/28cdc5a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java new file mode 100644 index 0000000..e96b96a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.hdfs.DFSConfigKeys + .DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys + .DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp; +import org.apache.hadoop.hdfs.server.blockmanagement.SlowDiskTracker + .DiskLatency; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.FakeTimer; + +import com.google.common.base.Supplier; +import com.google.common.collect.Maps; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link SlowDiskTracker}. + */ +public class TestSlowDiskTracker { + public static final Logger LOG = LoggerFactory.getLogger( + TestSlowDiskTracker.class); + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + + private static Configuration conf; + private SlowDiskTracker tracker; + private FakeTimer timer; + private long reportValidityMs; + private static final long OUTLIERS_REPORT_INTERVAL = 1000; + + static { + conf = new HdfsConfiguration(); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setDouble(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY, 1.0); + conf.setTimeDuration(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, + OUTLIERS_REPORT_INTERVAL, TimeUnit.MILLISECONDS); + } + @Before + public void setup() { + timer = new FakeTimer(); + tracker = new SlowDiskTracker(conf, timer); + reportValidityMs = tracker.getReportValidityMs(); + } + + @Test + public void testDataNodeHeartbeatSlowDiskReport() throws Exception { + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .build(); + try { + DataNode dn1 = cluster.getDataNodes().get(0); + DataNode dn2 = cluster.getDataNodes().get(1); + NameNode nn = cluster.getNameNode(0); + + DatanodeManager datanodeManager = nn.getNamesystem().getBlockManager() + .getDatanodeManager(); + SlowDiskTracker slowDiskTracker = datanodeManager.getSlowDiskTracker(); + slowDiskTracker.setReportValidityMs(OUTLIERS_REPORT_INTERVAL * 100); + + dn1.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of( + DiskOp.WRITE, 1.3)); + dn1.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of( + DiskOp.READ, 1.6, DiskOp.WRITE, 1.1)); + dn2.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of( + DiskOp.METADATA, 0.8)); + dn2.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of( + DiskOp.WRITE, 1.3)); + + String dn1ID = dn1.getDatanodeId().getIpcAddr(false); + String dn2ID = dn2.getDatanodeId().getIpcAddr(false); + + // Advance the timer and wait for NN to receive reports from DataNodes. + Thread.sleep(OUTLIERS_REPORT_INTERVAL); + + // Wait for NN to receive reports from all DNs + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return (slowDiskTracker.getSlowDisksReport().size() == 4); + } + }, 1000, 100000); + + Map<String, DiskLatency> slowDisksReport = getSlowDisksReportForTesting( + slowDiskTracker); + + assertThat(slowDisksReport.size(), is(4)); + assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk1") + .getLatency(DiskOp.WRITE) - 1.3) < 0.0000001); + assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2") + .getLatency(DiskOp.READ) - 1.6) < 0.0000001); + assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2") + .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001); + assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk1") + .getLatency(DiskOp.METADATA) - 0.8) < 0.0000001); + assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk2") + .getLatency(DiskOp.WRITE) - 1.3) < 0.0000001); + + // Test the slow disk report JSON string + ArrayList<DiskLatency> jsonReport = getAndDeserializeJson( + slowDiskTracker.getSlowDiskReportAsJsonString()); + + assertThat(jsonReport.size(), is(4)); + assertTrue(isDiskInReports(jsonReport, dn1ID, "disk1", DiskOp.WRITE, 1.3)); + assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.READ, 1.6)); + assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.WRITE, 1.1)); + assertTrue(isDiskInReports(jsonReport, dn2ID, "disk1", DiskOp.METADATA, + 0.8)); + assertTrue(isDiskInReports(jsonReport, dn2ID, "disk2", DiskOp.WRITE, 1.3)); + } finally { + cluster.shutdown(); + } + } + + /** + * Edge case, there are no reports to retrieve. + */ + @Test + public void testEmptyReports() { + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + assertTrue(getSlowDisksReportForTesting(tracker).isEmpty()); + } + + @Test + public void testReportsAreRetrieved() throws Exception { + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); + addSlowDiskForTesting("dn1", "disk2", + ImmutableMap.of(DiskOp.READ, 1.3)); + addSlowDiskForTesting("dn2", "disk2", + ImmutableMap.of(DiskOp.READ, 1.1)); + + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return !tracker.getSlowDisksReport().isEmpty(); + } + }, 500, 5000); + + Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); + + assertThat(reports.size(), is(3)); + assertTrue(Math.abs(reports.get("dn1:disk1") + .getLatency(DiskOp.METADATA) - 1.1) < 0.0000001); + assertTrue(Math.abs(reports.get("dn1:disk1") + .getLatency(DiskOp.READ) - 1.8) < 0.0000001); + assertTrue(Math.abs(reports.get("dn1:disk2") + .getLatency(DiskOp.READ) - 1.3) < 0.0000001); + assertTrue(Math.abs(reports.get("dn2:disk2") + .getLatency(DiskOp.READ) - 1.1) < 0.0000001); + } + + /** + * Test that when all reports are expired, we get back nothing. + */ + @Test + public void testAllReportsAreExpired() throws Exception { + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); + addSlowDiskForTesting("dn1", "disk2", + ImmutableMap.of(DiskOp.READ, 1.3)); + addSlowDiskForTesting("dn2", "disk2", + ImmutableMap.of(DiskOp.WRITE, 1.1)); + + // No reports should expire after 1ms. + timer.advance(1); + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return !tracker.getSlowDisksReport().isEmpty(); + } + }, 500, 5000); + + Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); + + assertThat(reports.size(), is(3)); + assertTrue(Math.abs(reports.get("dn1:disk1") + .getLatency(DiskOp.METADATA) - 1.1) < 0.0000001); + assertTrue(Math.abs(reports.get("dn1:disk1") + .getLatency(DiskOp.READ) - 1.8) < 0.0000001); + assertTrue(Math.abs(reports.get("dn1:disk2") + .getLatency(DiskOp.READ) - 1.3) < 0.0000001); + assertTrue(Math.abs(reports.get("dn2:disk2") + .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001); + + // All reports should expire after REPORT_VALIDITY_MS. + timer.advance(reportValidityMs); + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return tracker.getSlowDisksReport().isEmpty(); + } + }, 500, 3000); + + reports = getSlowDisksReportForTesting(tracker); + + assertThat(reports.size(), is(0)); + } + + /** + * Test the case when a subset of reports has expired. + * Ensure that we only get back non-expired reports. + */ + @Test + public void testSomeReportsAreExpired() throws Exception { + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); + addSlowDiskForTesting("dn1", "disk2", + ImmutableMap.of(DiskOp.READ, 1.3)); + timer.advance(reportValidityMs); + addSlowDiskForTesting("dn2", "disk2", + ImmutableMap.of(DiskOp.WRITE, 1.1)); + + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return !tracker.getSlowDisksReport().isEmpty(); + } + }, 500, 5000); + + Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); + + assertThat(reports.size(), is(1)); + assertTrue(Math.abs(reports.get("dn2:disk2") + .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001); + } + + /** + * Test the case when an expired report is replaced by a valid one. + */ + @Test + public void testReplacement() throws Exception { + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); + timer.advance(reportValidityMs); + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.READ, 1.4)); + + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return !tracker.getSlowDisksReport().isEmpty(); + } + }, 500, 5000); + + Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); + + assertThat(reports.size(), is(1)); + assertTrue(reports.get("dn1:disk1").getLatency(DiskOp.METADATA) == null); + assertTrue(Math.abs(reports.get("dn1:disk1") + .getLatency(DiskOp.READ) - 1.4) < 0.0000001); + } + + @Test + public void testGetJson() throws Exception { + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); + addSlowDiskForTesting("dn1", "disk2", + ImmutableMap.of(DiskOp.READ, 1.3)); + addSlowDiskForTesting("dn2", "disk2", + ImmutableMap.of(DiskOp.WRITE, 1.1)); + addSlowDiskForTesting("dn3", "disk1", + ImmutableMap.of(DiskOp.WRITE, 1.1)); + + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return tracker.getSlowDiskReportAsJsonString() != null; + } + }, 500, 5000); + + ArrayList<DiskLatency> jsonReport = getAndDeserializeJson( + tracker.getSlowDiskReportAsJsonString()); + + // And ensure its contents are what we expect. + assertThat(jsonReport.size(), is(4)); + assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.METADATA, + 1.1)); + assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.8)); + assertTrue(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.3)); + assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.WRITE, 1.1)); + assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.1)); + } + + @Test + public void testGetJsonSizeIsLimited() throws Exception { + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.READ, 1.1)); + addSlowDiskForTesting("dn1", "disk2", + ImmutableMap.of(DiskOp.READ, 1.2)); + addSlowDiskForTesting("dn1", "disk3", + ImmutableMap.of(DiskOp.READ, 1.3)); + addSlowDiskForTesting("dn2", "disk1", + ImmutableMap.of(DiskOp.READ, 1.4)); + addSlowDiskForTesting("dn2", "disk2", + ImmutableMap.of(DiskOp.READ, 1.5)); + addSlowDiskForTesting("dn3", "disk1", + ImmutableMap.of(DiskOp.WRITE, 1.6)); + addSlowDiskForTesting("dn3", "disk2", + ImmutableMap.of(DiskOp.READ, 1.7)); + addSlowDiskForTesting("dn3", "disk3", + ImmutableMap.of(DiskOp.READ, 1.2)); + + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return tracker.getSlowDiskReportAsJsonString() != null; + } + }, 500, 5000); + + ArrayList<DiskLatency> jsonReport = getAndDeserializeJson( + tracker.getSlowDiskReportAsJsonString()); + + // Ensure that only the top 5 highest latencies are in the report. + assertThat(jsonReport.size(), is(5)); + assertTrue(isDiskInReports(jsonReport, "dn3", "disk2", DiskOp.READ, 1.7)); + assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.6)); + assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.READ, 1.5)); + assertTrue(isDiskInReports(jsonReport, "dn2", "disk1", DiskOp.READ, 1.4)); + assertTrue(isDiskInReports(jsonReport, "dn1", "disk3", DiskOp.READ, 1.3)); + + // Remaining nodes should be in the list. + assertFalse(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.1)); + assertFalse(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.2)); + assertFalse(isDiskInReports(jsonReport, "dn3", "disk3", DiskOp.READ, 1.2)); + } + + @Test + public void testEmptyReport() throws Exception { + addSlowDiskForTesting("dn1", "disk1", + ImmutableMap.of(DiskOp.READ, 1.1)); + timer.advance(reportValidityMs); + + tracker.updateSlowDiskReportAsync(timer.monotonicNow()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return tracker.getSlowDiskReportAsJsonString() != null; + } + }, 500, 5000); + + ArrayList<DiskLatency> jsonReport = getAndDeserializeJson( + tracker.getSlowDiskReportAsJsonString()); + + assertTrue(jsonReport.isEmpty()); + } + + private boolean isDiskInReports(ArrayList<DiskLatency> reports, + String dataNodeID, String disk, DiskOp diskOp, double latency) { + String diskID = SlowDiskTracker.getSlowDiskIDForReport(dataNodeID, disk); + for (DiskLatency diskLatency : reports) { + if (diskLatency.getSlowDiskID().equals(diskID)) { + if (diskLatency.getLatency(diskOp) == null) { + return false; + } + if (Math.abs(diskLatency.getLatency(diskOp) - latency) < 0.0000001) { + return true; + } + } + } + return false; + } + + private ArrayList<DiskLatency> getAndDeserializeJson( + final String json) throws IOException { + return (new ObjectMapper()).readValue(json, + new TypeReference<ArrayList<DiskLatency>>() {}); + } + + private void addSlowDiskForTesting(String dnID, String disk, + Map<DiskOp, Double> latencies) { + Map<String, Map<DiskOp, Double>> slowDisk = Maps.newHashMap(); + slowDisk.put(disk, latencies); + SlowDiskReports slowDiskReport = SlowDiskReports.create(slowDisk); + tracker.addSlowDiskReport(dnID, slowDiskReport); + } + + Map<String, DiskLatency> getSlowDisksReportForTesting( + SlowDiskTracker slowDiskTracker) { + Map<String, DiskLatency> slowDisksMap = Maps.newHashMap(); + for (DiskLatency diskLatency : slowDiskTracker.getSlowDisksReport()) { + slowDisksMap.put(diskLatency.getSlowDiskID(), diskLatency); + } + return slowDisksMap; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
