HDFS-11194. Maintain aggregated peer performance metrics on NameNode.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b57368b6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b57368b6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b57368b6 Branch: refs/heads/YARN-3926 Commit: b57368b6f893cb27d77fc9425e116f1312f4790f Parents: 8528d85 Author: Arpit Agarwal <[email protected]> Authored: Tue Jan 24 16:58:20 2017 -0800 Committer: Arpit Agarwal <[email protected]> Committed: Tue Jan 24 16:58:20 2017 -0800 ---------------------------------------------------------------------- .../hadoop/metrics2/lib/RollingAverages.java | 57 +++- .../metrics2/lib/TestRollingAverages.java | 13 +- .../hdfs/server/protocol/SlowPeerReports.java | 107 ++++++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 26 +- .../DatanodeProtocolClientSideTranslatorPB.java | 9 +- .../DatanodeProtocolServerSideTranslatorPB.java | 3 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 44 +++ .../server/blockmanagement/DatanodeManager.java | 42 ++- .../server/blockmanagement/SlowPeerTracker.java | 273 +++++++++++++++ .../hdfs/server/datanode/BPServiceActor.java | 38 ++- .../hdfs/server/datanode/BlockReceiver.java | 17 +- .../hadoop/hdfs/server/datanode/DNConf.java | 11 + .../hadoop/hdfs/server/datanode/DataNode.java | 8 +- .../hdfs/server/datanode/DataXceiver.java | 4 +- .../datanode/metrics/DataNodePeerMetrics.java | 79 +++-- .../datanode/metrics/SlowNodeDetector.java | 194 +++++++++++ .../hdfs/server/namenode/FSNamesystem.java | 8 +- .../hadoop/hdfs/server/namenode/NameNode.java | 6 + .../hdfs/server/namenode/NameNodeRpcServer.java | 9 +- .../server/namenode/NameNodeStatusMXBean.java | 6 + .../hdfs/server/protocol/DatanodeProtocol.java | 8 +- .../src/main/proto/DatanodeProtocol.proto | 20 ++ .../src/main/resources/hdfs-default.xml | 18 +- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 22 ++ .../blockmanagement/TestHeartbeatHandling.java | 10 + .../TestNameNodePrunesMissingStorages.java | 4 +- .../blockmanagement/TestSlowPeerTracker.java | 226 +++++++++++++ .../datanode/InternalDataNodeTestUtils.java | 4 +- .../server/datanode/TestBPOfferService.java | 6 +- .../hdfs/server/datanode/TestBlockRecovery.java | 4 +- .../datanode/TestBpServiceActorScheduler.java | 30 +- .../server/datanode/TestDataNodeLifeline.java | 7 +- .../datanode/TestDataNodePeerMetrics.java | 8 +- .../TestDatanodeProtocolRetryPolicy.java | 4 +- .../server/datanode/TestFsDatasetCache.java | 3 +- .../hdfs/server/datanode/TestStorageReport.java | 4 +- .../TestDataNodeOutlierDetectionViaMetrics.java | 142 ++++++++ .../datanode/metrics/TestSlowNodeDetector.java | 335 +++++++++++++++++++ .../server/namenode/NNThroughputBenchmark.java | 7 +- .../hdfs/server/namenode/NameNodeAdapter.java | 4 +- .../hdfs/server/namenode/TestDeadDatanode.java | 4 +- .../hadoop/tools/TestHdfsConfigFields.java | 4 + 42 files changed, 1721 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java index 06ae30d..4e3b73f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java @@ -20,6 +20,7 @@ package org.apache.hadoop.metrics2.lib; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -39,6 +40,9 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import javax.annotation.Nullable; + import static org.apache.hadoop.metrics2.lib.Interns.*; /** @@ -63,7 +67,10 @@ public class RollingAverages extends MutableMetric implements Closeable { .setNameFormat("RollingAverages-%d").build()); private ScheduledFuture<?> scheduledTask = null; + + @Nullable private Map<String, MutableRate> currentSnapshot; + private final int numWindows; private final String avgInfoNameTemplate; private final String avgInfoDescTemplate; @@ -100,31 +107,31 @@ public class RollingAverages extends MutableMetric implements Closeable { /** * Constructor of {@link RollingAverages}. - * @param windowSize - * The number of seconds of each window for which sub set of samples - * are gathered to compute the rolling average, A.K.A. roll over - * interval. + * @param windowSizeMs + * The number of milliseconds of each window for which subset + * of samples are gathered to compute the rolling average, A.K.A. + * roll over interval. * @param numWindows * The number of windows maintained to compute the rolling average. * @param valueName * of the metric (e.g. "Time", "Latency") */ public RollingAverages( - final int windowSize, + final long windowSizeMs, final int numWindows, final String valueName) { String uvName = StringUtils.capitalize(valueName); String lvName = StringUtils.uncapitalize(valueName); - avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName; + avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName; avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s"; this.numWindows = numWindows; scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this), - windowSize, windowSize, TimeUnit.SECONDS); + windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS); } /** * Constructor of {@link RollingAverages}. - * @param windowSize + * @param windowSizeMs * The number of seconds of each window for which sub set of samples * are gathered to compute rolling average, also A.K.A roll over * interval. @@ -133,9 +140,9 @@ public class RollingAverages extends MutableMetric implements Closeable { * average of the rolling averages. */ public RollingAverages( - final int windowSize, + final long windowSizeMs, final int numWindows) { - this(windowSize, numWindows, "Time"); + this(windowSizeMs, numWindows, "Time"); } @Override @@ -213,7 +220,7 @@ public class RollingAverages extends MutableMetric implements Closeable { * Iterates over snapshot to capture all Avg metrics into rolling structure * {@link RollingAverages#averages}. */ - private void rollOverAvgs() { + private synchronized void rollOverAvgs() { if (currentSnapshot == null) { return; } @@ -248,4 +255,32 @@ public class RollingAverages extends MutableMetric implements Closeable { } scheduledTask = null; } + + /** + * Retrieve a map of metric name -> (aggregate). + * Filter out entries that don't have at least minSamples. + * + * @return a map of peer DataNode Id to the average latency to that + * node seen over the measurement period. + */ + public synchronized Map<String, Double> getStats(long minSamples) { + final Map<String, Double> stats = new HashMap<>(); + + for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry + : averages.entrySet()) { + final String name = entry.getKey(); + double totalSum = 0; + long totalCount = 0; + + for (final SumAndCount sumAndCount : entry.getValue()) { + totalCount += sumAndCount.getCount(); + totalSum += sumAndCount.getSum(); + } + + if (totalCount > minSamples) { + stats.put(name, totalSum / totalCount); + } + } + return stats; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java index 899d98c..44202e7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java @@ -42,7 +42,8 @@ public class TestRollingAverages { public void testRollingAveragesEmptyRollover() throws Exception { final MetricsRecordBuilder rb = mockMetricsRecordBuilder(); /* 5s interval and 2 windows */ - try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) { + try (final RollingAverages rollingAverages = + new RollingAverages(5000, 2)) { /* Check it initially */ rollingAverages.snapshot(rb, true); verify(rb, never()).addGauge( @@ -74,10 +75,10 @@ public class TestRollingAverages { public void testRollingAveragesRollover() throws Exception { final MetricsRecordBuilder rb = mockMetricsRecordBuilder(); final String name = "foo2"; - final int windowSize = 5; // 5s roll over interval + final int windowSizeMs = 5000; // 5s roll over interval final int numWindows = 2; final int numOpsPerIteration = 1000; - try (RollingAverages rollingAverages = new RollingAverages(windowSize, + try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs, numWindows)) { /* Push values for three intervals */ @@ -92,7 +93,7 @@ public class TestRollingAverages { * Sleep until 1s after the next windowSize seconds interval, to let the * metrics roll over */ - final long sleep = (start + (windowSize * 1000 * i) + 1000) + final long sleep = (start + (windowSizeMs * i) + 1000) - Time.monotonicNow(); Thread.sleep(sleep); @@ -110,12 +111,12 @@ public class TestRollingAverages { final long rollingTotal = i > 1 ? 2 * numOpsPerIteration : numOpsPerIteration; verify(rb).addGauge( - info("Foo2RollingAvgTime", "Rolling average time for foo2"), + info("[Foo2]RollingAvgTime", "Rolling average time for foo2"), rollingSum / rollingTotal); /* Verify the metrics were added the right number of times */ verify(rb, times(i)).addGauge( - eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")), + eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")), anyDouble()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java new file mode 100644 index 0000000..218e30d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +/** + * A class that allows a DataNode to communicate information about all + * its peer DataNodes that appear to be slow. + * + * The wire representation of this structure is a list of + * SlowPeerReportProto messages. + */ [email protected] [email protected] +public final class SlowPeerReports { + /** + * A map from the DataNode's DataNodeUUID to its aggregate latency + * as seen by the reporting node. + * + * The exact choice of the aggregate is opaque to the NameNode but it + * should be chosen consistently by all DataNodes in the cluster. + * Examples of aggregates are 90th percentile (good) and mean (not so + * good). + * + * The NameNode must not attempt to interpret the aggregate latencies + * beyond exposing them as a diagnostic. e.g. metrics. Also, comparing + * latencies across reports from different DataNodes may not be not + * meaningful and must be avoided. + */ + @Nonnull + private final Map<String, Double> slowPeers; + + /** + * An object representing a SlowPeerReports with no entries. Should + * be used instead of null or creating new objects when there are + * no slow peers to report. + */ + public static final SlowPeerReports EMPTY_REPORT = + new SlowPeerReports(ImmutableMap.of()); + + private SlowPeerReports(Map<String, Double> slowPeers) { + this.slowPeers = slowPeers; + } + + public static SlowPeerReports create( + @Nullable Map<String, Double> slowPeers) { + if (slowPeers == null || slowPeers.isEmpty()) { + return EMPTY_REPORT; + } + return new SlowPeerReports(slowPeers); + } + + public Map<String, Double> getSlowPeers() { + return slowPeers; + } + + public boolean haveSlowPeers() { + return slowPeers.size() > 0; + } + + /** + * Return true if the two objects represent the same set slow peer + * entries. Primarily for unit testing convenience. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof SlowPeerReports)) { + return false; + } + + SlowPeerReports that = (SlowPeerReports) o; + + return slowPeers.equals(that.slowPeers); + } + + @Override + public int hashCode() { + return slowPeers.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8f60af0..3cc4b5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -457,14 +457,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_METRICS_SESSION_ID_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY; public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals"; - public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY = - "dfs.metrics.rolling.average.window.size"; - public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT = - 3600; - public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY = - "dfs.metrics.rolling.average.window.numbers"; - public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT = - 48; + + // The following setting is not meant to be changed by administrators. + public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY = + "dfs.metrics.rolling.averages.window.length"; + public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT = + "5m"; + + // The following setting is not meant to be changed by administrators. + public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY = + "dfs.metrics.rolling.average.num.windows"; + public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT = + 36; + public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY = "dfs.datanode.peer.stats.enabled"; public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false; @@ -669,6 +674,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit"; public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000; + public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY = + "dfs.datanode.slow.peers.report.interval"; + public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT = + "30m"; + // property for fsimage compression public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress"; public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 79113dd..d9e6026 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -71,6 +72,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import javax.annotation.Nonnull; + /** * This class is the client side translator to translate the requests made on * {@link DatanodeProtocol} interfaces to the RPC server implementing @@ -132,7 +135,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements StorageReport[] reports, long cacheCapacity, long cacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) throws IOException { + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -149,6 +153,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary( volumeFailureSummary)); } + if (slowPeers.haveSlowPeers()) { + builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers)); + } HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 257adf9..b1c8e34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -120,7 +120,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements report, request.getCacheCapacity(), request.getCacheUsed(), request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes(), - volumeFailureSummary, request.getRequestFullBlockReportLease()); + volumeFailureSummary, request.getRequestFullBlockReportLease(), + PBHelper.convertSlowPeerInfo(request.getSlowPeersList())); } catch (IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index d97708f..69c3c83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -20,7 +20,10 @@ package org.apache.hadoop.hdfs.protocolPB; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import com.google.protobuf.ByteString; @@ -44,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeComm import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; @@ -107,6 +111,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; /** @@ -829,6 +834,45 @@ public class PBHelper { return builder.build(); } + public static List<SlowPeerReportProto> convertSlowPeerInfo( + SlowPeerReports slowPeers) { + if (slowPeers.getSlowPeers().size() == 0) { + return Collections.emptyList(); + } + + List<SlowPeerReportProto> slowPeerInfoProtos = + new ArrayList<>(slowPeers.getSlowPeers().size()); + for (Map.Entry<String, Double> entry : + slowPeers.getSlowPeers().entrySet()) { + slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder() + .setDataNodeId(entry.getKey()) + .setAggregateLatency(entry.getValue()) + .build()); + } + return slowPeerInfoProtos; + } + + public static SlowPeerReports convertSlowPeerInfo( + List<SlowPeerReportProto> slowPeerProtos) { + + // No slow peers, or possibly an older DataNode. + if (slowPeerProtos == null || slowPeerProtos.size() == 0) { + return SlowPeerReports.EMPTY_REPORT; + } + + Map<String, Double> slowPeersMap = new HashMap<>(slowPeerProtos.size()); + for (SlowPeerReportProto proto : slowPeerProtos) { + if (!proto.hasDataNodeId()) { + // The DataNodeId should be reported. + continue; + } + slowPeersMap.put( + proto.getDataNodeId(), + proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0); + } + return SlowPeerReports.create(slowPeersMap); + } + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index cc64a04..fed1864 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -50,7 +50,10 @@ import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Timer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; @@ -172,6 +175,14 @@ public class DatanodeManager { */ private final HashMap<String, Integer> datanodesSoftwareVersions = new HashMap<>(4, 0.75f); + + /** + * True if we should process latency metrics from downstream peers. + */ + private final boolean dataNodePeerStatsEnabled; + + @Nullable + private final SlowPeerTracker slowPeerTracker; /** * The minimum time between resending caching directives to Datanodes, @@ -194,6 +205,12 @@ public class DatanodeManager { this.decomManager = new DecommissionManager(namesystem, blockManager, heartbeatManager); this.fsClusterStats = newFSClusterStats(); + this.dataNodePeerStatsEnabled = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + + this.slowPeerTracker = dataNodePeerStatsEnabled ? + new SlowPeerTracker(conf, new Timer()) : null; this.defaultXferPort = NetUtils.createSocketAddr( conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, @@ -1566,7 +1583,8 @@ public class DatanodeManager { StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException { + VolumeFailureSummary volumeFailureSummary, + @Nonnull SlowPeerReports slowPeers) throws IOException { final DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); @@ -1632,6 +1650,19 @@ public class DatanodeManager { nodeinfo.setBalancerBandwidth(0); } + if (slowPeerTracker != null) { + final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers(); + if (!slowPeersMap.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("DataNode " + nodeReg + " reported slow peers: " + + slowPeersMap); + } + for (String slowNodeId : slowPeersMap.keySet()) { + slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false)); + } + } + } + if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } @@ -1834,5 +1865,14 @@ public class DatanodeManager { this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds), DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT); } + + /** + * Retrieve information about slow peers as a JSON. + * Returns null if we are not tracking slow peers. + * @return + */ + public String getSlowPeersReport() { + return slowPeerTracker != null ? slowPeerTracker.getJson() : null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java new file mode 100644 index 0000000..cf3a20c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.Ints; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + + +/** + * This class aggregates information from {@link SlowPeerReports} received via + * heartbeats. + */ [email protected] [email protected] +public class SlowPeerTracker { + public static final Logger LOG = + LoggerFactory.getLogger(SlowPeerTracker.class); + + /** + * Time duration after which a report is considered stale. This is + * set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e. + * maintained for at least two successive reports. + */ + private final long reportValidityMs; + + /** + * Timer object for querying the current time. Separated out for + * unit testing. + */ + private final Timer timer; + + /** + * Number of nodes to include in JSON report. We will return nodes with + * the highest number of votes from peers. + */ + private static final int MAX_NODES_TO_REPORT = 5; + + /** + * Information about peers that have reported a node as being slow. + * Each outer map entry is a map of (DatanodeId) -> (timestamp), + * mapping reporting nodes to the timestamp of the last report from + * that node. + * + * DatanodeId could be the DataNodeId or its address. We + * don't care as long as the caller uses it consistently. + * + * Stale reports are not evicted proactively and can potentially + * hang around forever. + */ + private final ConcurrentMap<String, ConcurrentMap<String, Long>> + allReports; + + public SlowPeerTracker(Configuration conf, Timer timer) { + this.timer = timer; + this.allReports = new ConcurrentHashMap<>(); + this.reportValidityMs = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY, + DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS) * 3; + } + + /** + * Add a new report. DatanodeIds can be the DataNodeIds or addresses + * We don't care as long as the caller is consistent. + * + * @param reportingNode DataNodeId of the node reporting on its peer. + * @param slowNode DataNodeId of the peer suspected to be slow. + */ + public void addReport(String slowNode, + String reportingNode) { + ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode); + + if (nodeEntries == null) { + // putIfAbsent guards against multiple writers. + allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>()); + nodeEntries = allReports.get(slowNode); + } + + // Replace the existing entry from this node, if any. + nodeEntries.put(reportingNode, timer.monotonicNow()); + } + + /** + * Retrieve the non-expired reports that mark a given DataNode + * as slow. Stale reports are excluded. + * + * @param slowNode target node Id. + * @return set of reports which implicate the target node as being slow. + */ + public Set<String> getReportsForNode(String slowNode) { + final ConcurrentMap<String, Long> nodeEntries = + allReports.get(slowNode); + + if (nodeEntries == null || nodeEntries.isEmpty()) { + return Collections.emptySet(); + } + + return filterNodeReports(nodeEntries, timer.monotonicNow()); + } + + /** + * Retrieve all reports for all nodes. Stale reports are excluded. + * + * @return map from SlowNodeId -> (set of nodes reporting peers). + */ + public Map<String, SortedSet<String>> getReportsForAllDataNodes() { + if (allReports.isEmpty()) { + return ImmutableMap.of(); + } + + final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>(); + final long now = timer.monotonicNow(); + + for (Map.Entry<String, ConcurrentMap<String, Long>> entry : + allReports.entrySet()) { + SortedSet<String> validReports = filterNodeReports(entry.getValue(), now); + if (!validReports.isEmpty()) { + allNodesValidReports.put(entry.getKey(), validReports); + } + } + return allNodesValidReports; + } + + /** + * Filter the given reports to return just the valid ones. + * + * @param reports + * @param now + * @return + */ + private SortedSet<String> filterNodeReports( + ConcurrentMap<String, Long> reports, long now) { + final SortedSet<String> validReports = new TreeSet<>(); + + for (Map.Entry<String, Long> entry : reports.entrySet()) { + if (now - entry.getValue() < reportValidityMs) { + validReports.add(entry.getKey()); + } + } + return validReports; + } + + /** + * Retrieve all valid reports as a JSON string. + * @return serialized representation of valid reports. null if + * serialization failed. + */ + public String getJson() { + Collection<ReportForJson> validReports = getJsonReports( + MAX_NODES_TO_REPORT); + ObjectMapper objectMapper = new ObjectMapper(); + try { + return objectMapper.writeValueAsString(validReports); + } catch (JsonProcessingException e) { + // Failed to serialize. Don't log the exception call stack. + LOG.debug("Failed to serialize statistics" + e); + return null; + } + } + + /** + * This structure is a thin wrapper over reports to make Json + * [de]serialization easy. + */ + public static class ReportForJson { + @JsonProperty("SlowNode") + final private String slowNode; + + @JsonProperty("ReportingNodes") + final private SortedSet<String> reportingNodes; + + public ReportForJson( + @JsonProperty("SlowNode") String slowNode, + @JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) { + this.slowNode = slowNode; + this.reportingNodes = reportingNodes; + } + + public String getSlowNode() { + return slowNode; + } + + public SortedSet<String> getReportingNodes() { + return reportingNodes; + } + } + + /** + * Retrieve reports in a structure for generating JSON, limiting the + * output to the top numNodes nodes i.e nodes with the most reports. + * @param numNodes number of nodes to return. This is to limit the + * size of the generated JSON. + */ + private Collection<ReportForJson> getJsonReports(int numNodes) { + if (allReports.isEmpty()) { + return Collections.emptyList(); + } + + final PriorityQueue<ReportForJson> topNReports = + new PriorityQueue<>(allReports.size(), + new Comparator<ReportForJson>() { + @Override + public int compare(ReportForJson o1, ReportForJson o2) { + return Ints.compare(o1.reportingNodes.size(), + o2.reportingNodes.size()); + } + }); + + final long now = timer.monotonicNow(); + + for (Map.Entry<String, ConcurrentMap<String, Long>> entry : + allReports.entrySet()) { + SortedSet<String> validReports = filterNodeReports( + entry.getValue(), now); + if (!validReports.isEmpty()) { + if (topNReports.size() < numNodes) { + topNReports.add(new ReportForJson(entry.getKey(), validReports)); + } else if (topNReports.peek().getReportingNodes().size() < + validReports.size()){ + // Remove the lowest element + topNReports.poll(); + topNReports.add(new ReportForJson(entry.getKey(), validReports)); + } + } + } + return topNReports; + } + + @VisibleForTesting + long getReportValidityMs() { + return reportValidityMs; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 5294799..644a8ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -127,7 +128,8 @@ class BPServiceActor implements Runnable { this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval); prevBlockReportId = ThreadLocalRandom.current().nextLong(); scheduler = new Scheduler(dnConf.heartBeatInterval, - dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval); + dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, + dnConf.slowPeersReportIntervalMs); // get the value of maxDataLength. this.maxDataLength = dnConf.getMaxDataLength(); } @@ -489,12 +491,18 @@ class BPServiceActor implements Runnable { " storage reports from service actor: " + this); } - scheduler.updateLastHeartbeatTime(monotonicNow()); + final long now = monotonicNow(); + scheduler.updateLastHeartbeatTime(now); VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; - return bpNamenode.sendHeartbeat(bpRegistration, + final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now); + final SlowPeerReports slowPeers = + slowPeersReportDue && dn.getPeerMetrics() != null ? + SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) : + SlowPeerReports.EMPTY_REPORT; + HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), @@ -502,7 +510,14 @@ class BPServiceActor implements Runnable { dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary, - requestBlockReportLease); + requestBlockReportLease, + slowPeers); + + if (slowPeersReportDue) { + // If the report was due and successfully sent, schedule the next one. + scheduler.scheduleNextSlowPeerReport(); + } + return response; } @VisibleForTesting @@ -1079,18 +1094,23 @@ class BPServiceActor implements Runnable { @VisibleForTesting boolean resetBlockReportTime = true; + @VisibleForTesting + volatile long nextSlowPeersReportTime = monotonicNow(); + private final AtomicBoolean forceFullBlockReport = new AtomicBoolean(false); private final long heartbeatIntervalMs; private final long lifelineIntervalMs; private final long blockReportIntervalMs; + private final long slowPeersReportIntervalMs; Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs, - long blockReportIntervalMs) { + long blockReportIntervalMs, long slowPeersReportIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; this.lifelineIntervalMs = lifelineIntervalMs; this.blockReportIntervalMs = blockReportIntervalMs; + this.slowPeersReportIntervalMs = slowPeersReportIntervalMs; scheduleNextLifeline(nextHeartbeatTime); } @@ -1123,6 +1143,10 @@ class BPServiceActor implements Runnable { lastBlockReportTime = blockReportTime; } + void scheduleNextSlowPeerReport() { + nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs; + } + long getLastHearbeatTime() { return (monotonicNow() - lastHeartbeatTime)/1000; } @@ -1149,6 +1173,10 @@ class BPServiceActor implements Runnable { return nextBlockReportTime - curTime <= 0; } + boolean isSlowPeersReportDue(long curTime) { + return nextSlowPeersReportTime - curTime <= 0; + } + void forceFullBlockReportNow() { forceFullBlockReport.set(true); resetBlockReportTime = true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 567597d..dd4b58b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -93,7 +94,7 @@ class BlockReceiver implements Closeable { protected final String inAddr; protected final String myAddr; private String mirrorAddr; - private String bracketedMirrorAddr; + private String mirrorNameForMetrics; private DataOutputStream mirrorOut; private Daemon responder = null; private DataTransferThrottler throttler; @@ -843,10 +844,9 @@ class BlockReceiver implements Closeable { * </p> */ private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) { - if (isPenultimateNode && mirrorAddr != null) { - datanode.getPeerMetrics().addSendPacketDownstream( - bracketedMirrorAddr, - elapsedMs); + final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics(); + if (peerMetrics != null && isPenultimateNode) { + peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs); } } @@ -927,8 +927,13 @@ class BlockReceiver implements Closeable { boolean responderClosed = false; mirrorOut = mirrOut; mirrorAddr = mirrAddr; - bracketedMirrorAddr = "[" + mirrAddr + "]"; isPenultimateNode = ((downstreams != null) && (downstreams.length == 1)); + if (isPenultimateNode) { + mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ? + downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr()); + LOG.debug("Will collect peer metrics for downstream node {}", + mirrorNameForMetrics); + } throttler = throttlerArg; this.replyOut = replyOut; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index c1487b1..e2c5fbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; @@ -94,6 +96,8 @@ public class DNConf { private final long lifelineIntervalMs; final long blockReportInterval; final long blockReportSplitThreshold; + final boolean peerStatsEnabled; + final long slowPeersReportIntervalMs; final long ibrInterval; final long initialBlockReportDelayMs; final long cacheReportInterval; @@ -173,6 +177,13 @@ public class DNConf { this.blockReportInterval = getConf().getLong( DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + this.peerStatsEnabled = getConf().getBoolean( + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + this.slowPeersReportIntervalMs = getConf().getTimeDuration( + DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY, + DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); this.ibrInterval = getConf().getLong( DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 090d8b9..a6dfa46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -334,6 +334,7 @@ public class DataNode extends ReconfigurableBase private int infoSecurePort; DataNodeMetrics metrics; + @Nullable private DataNodePeerMetrics peerMetrics; private InetSocketAddress streamingAddr; @@ -422,6 +423,7 @@ public class DataNode extends ReconfigurableBase this.blockScanner = new BlockScanner(this, this.getConf()); this.pipelineSupportECN = false; this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + this.dnConf = new DNConf(this); initOOBTimeout(); storageLocationChecker = null; volumeChecker = new DatasetVolumeChecker(conf, new Timer()); @@ -1363,7 +1365,8 @@ public class DataNode extends ReconfigurableBase initIpcServer(); metrics = DataNodeMetrics.create(getConf(), getDisplayName()); - peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName()); + peerMetrics = dnConf.peerStatsEnabled ? + DataNodePeerMetrics.create(getConf(), getDisplayName()) : null; metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); ecWorker = new ErasureCodingWorker(getConf(), this); @@ -3456,6 +3459,7 @@ public class DataNode extends ReconfigurableBase @Override // DataNodeMXBean public String getSendPacketDownstreamAvgInfo() { - return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + return peerMetrics != null ? + peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index abcaa4a..f838fd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -341,7 +341,9 @@ class DataXceiver extends Receiver implements Runnable { * the thread dies away. */ private void collectThreadLocalStates() { - datanode.getPeerMetrics().collectThreadLocalStates(); + if (datanode.getPeerMetrics() != null) { + datanode.getPeerMetrics().collectThreadLocalStates(); + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java index 9344d1b..5241c78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java @@ -18,40 +18,59 @@ package org.apache.hadoop.hdfs.server.datanode.metrics; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.metrics2.MetricsJsonBuilder; import org.apache.hadoop.metrics2.lib.RollingAverages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; /** * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for * various peer operations. */ @InterfaceAudience.Private [email protected] public class DataNodePeerMetrics { - static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class); + public static final Logger LOG = LoggerFactory.getLogger( + DataNodePeerMetrics.class); private final RollingAverages sendPacketDownstreamRollingAvgerages; private final String name; - private final boolean peerStatsEnabled; + + /** + * Threshold in milliseconds below which a DataNode is definitely not slow. + */ + private static final long LOW_THRESHOLD_MS = 5; + + private final SlowNodeDetector slowNodeDetector; + + /** + * Minimum number of packet send samples which are required to qualify + * for outlier detection. If the number of samples is below this then + * outlier detection is skipped. + */ + @VisibleForTesting + static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000; public DataNodePeerMetrics( final String name, - final int windowSize, - final int numWindows, - final boolean peerStatsEnabled) { + final long windowSizeMs, + final int numWindows) { this.name = name; - this.peerStatsEnabled = peerStatsEnabled; + this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS); sendPacketDownstreamRollingAvgerages = new RollingAverages( - windowSize, - numWindows); + windowSizeMs, numWindows); } public String name() { @@ -66,21 +85,18 @@ public class DataNodePeerMetrics { ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt() : dnName.replace(':', '-')); - final int windowSize = conf.getInt( - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY, - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT); + final long windowSizeMs = conf.getTimeDuration( + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY, + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT, + TimeUnit.MILLISECONDS); final int numWindows = conf.getInt( - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY, - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT); - final boolean peerStatsEnabled = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, - DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY, + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT); return new DataNodePeerMetrics( name, - windowSize, - numWindows, - peerStatsEnabled); + windowSizeMs, + numWindows); } /** @@ -94,9 +110,7 @@ public class DataNodePeerMetrics { public void addSendPacketDownstream( final String peerAddr, final long elapsedMs) { - if (peerStatsEnabled) { - sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs); - } + sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs); } /** @@ -114,4 +128,19 @@ public class DataNodePeerMetrics { public void collectThreadLocalStates() { sendPacketDownstreamRollingAvgerages.collectThreadLocalStates(); } + + /** + * Retrieve the set of dataNodes that look significantly slower + * than their peers. + */ + public Map<String, Double> getOutliers() { + // This maps the metric name to the aggregate latency. + // The metric name is the datanode ID. + final Map<String, Double> stats = + sendPacketDownstreamRollingAvgerages.getStats( + MIN_OUTLIER_DETECTION_SAMPLES); + LOG.trace("DataNodePeerMetrics: Got stats: {}", stats); + + return slowNodeDetector.getOutliers(stats); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java new file mode 100644 index 0000000..b6278ce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * A utility class to help detect nodes whose aggregate latency + * is an outlier within a given set. + * + * We use the median absolute deviation for outlier detection as + * described in the following publication: + * + * Leys, C., et al., Detecting outliers: Do not use standard deviation + * around the mean, use absolute deviation around the median. + * http://dx.doi.org/10.1016/j.jesp.2013.03.013 + * + * We augment the above scheme with the following heuristics to be even + * more conservative: + * + * 1. Skip outlier detection if the sample size is too small. + * 2. Never flag nodes whose aggregate latency is below a low threshold. + * 3. Never flag nodes whose aggregate latency is less than a small + * multiple of the median. + */ [email protected] [email protected] +public class SlowNodeDetector { + public static final Logger LOG = + LoggerFactory.getLogger(SlowNodeDetector.class); + + /** + * Minimum number of peers to run outlier detection. + */ + private static long minOutlierDetectionPeers = 10; + + /** + * The multiplier is from Leys, C. et al. + */ + private static final double MAD_MULTIPLIER = (double) 1.4826; + + /** + * Threshold in milliseconds below which a DataNode is definitely not slow. + */ + private final long lowThresholdMs; + + /** + * Deviation multiplier. A sample is considered to be an outlier if it + * exceeds the median by (multiplier * median abs. deviation). 3 is a + * conservative choice. + */ + private static final int DEVIATION_MULTIPLIER = 3; + + /** + * If most of the samples are clustered together, the MAD can be + * low. The median multiplier introduces another safeguard to avoid + * overaggressive outlier detection. + */ + @VisibleForTesting + static final int MEDIAN_MULTIPLIER = 3; + + public SlowNodeDetector(long lowThresholdMs) { + this.lowThresholdMs = lowThresholdMs; + } + + /** + * Return a set of DataNodes whose latency is much higher than + * their peers. The input is a map of (node -> aggregate latency) + * entries. + * + * The aggregate may be an arithmetic mean or a percentile e.g. + * 90th percentile. Percentiles are a better choice than median + * since latency is usually not a normal distribution. + * + * This method allocates temporary memory O(n) and + * has run time O(n.log(n)), where n = stats.size(). + * + * @return + */ + public Map<String, Double> getOutliers(Map<String, Double> stats) { + if (stats.size() < minOutlierDetectionPeers) { + LOG.debug("Skipping statistical outlier detection as we don't have " + + "latency data for enough peers. Have {}, need at least {}", + stats.size(), minOutlierDetectionPeers); + return ImmutableMap.of(); + } + // Compute the median absolute deviation of the aggregates. + final List<Double> sorted = new ArrayList<>(stats.values()); + Collections.sort(sorted); + final Double median = computeMedian(sorted); + final Double mad = computeMad(sorted); + Double upperLimitLatency = Math.max( + lowThresholdMs, median * MEDIAN_MULTIPLIER); + upperLimitLatency = Math.max( + upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad)); + + final Map<String, Double> slowNodes = new HashMap<>(); + + LOG.trace("getOutliers: List={}, MedianLatency={}, " + + "MedianAbsoluteDeviation={}, upperLimitLatency={}", + sorted, median, mad, upperLimitLatency); + + // Find nodes whose latency exceeds the threshold. + for (Map.Entry<String, Double> entry : stats.entrySet()) { + if (entry.getValue() > upperLimitLatency) { + slowNodes.put(entry.getKey(), entry.getValue()); + } + } + + return slowNodes; + } + + /** + * Compute the Median Absolute Deviation of a sorted list. + */ + public static Double computeMad(List<Double> sortedValues) { + if (sortedValues.size() == 0) { + throw new IllegalArgumentException( + "Cannot compute the Median Absolute Deviation " + + "of an empty list."); + } + + // First get the median of the values. + Double median = computeMedian(sortedValues); + List<Double> deviations = new ArrayList<>(sortedValues); + + // Then update the list to store deviation from the median. + for (int i = 0; i < sortedValues.size(); ++i) { + deviations.set(i, Math.abs(sortedValues.get(i) - median)); + } + + // Finally get the median absolute deviation. + Collections.sort(deviations); + return computeMedian(deviations) * MAD_MULTIPLIER; + } + + /** + * Compute the median of a sorted list. + */ + public static Double computeMedian(List<Double> sortedValues) { + if (sortedValues.size() == 0) { + throw new IllegalArgumentException( + "Cannot compute the median of an empty list."); + } + + Double median = sortedValues.get(sortedValues.size() / 2); + if (sortedValues.size() % 2 == 0) { + median += sortedValues.get((sortedValues.size() / 2) - 1); + median /= 2; + } + return median; + } + + /** + * This method *must not* be used outside of unit tests. + */ + @VisibleForTesting + static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) { + SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers; + } + + @VisibleForTesting + static long getMinOutlierDetectionPeers() { + return minOutlierDetectionPeers; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 6ec0ee9..38a326c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -129,6 +129,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nonnull; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; @@ -255,6 +256,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -3639,7 +3641,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) throws IOException { + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException { readLock(); try { //get datanode commands @@ -3647,7 +3650,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, - xmitsInProgress; DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, - xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); + xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, + slowPeers); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index f6c724b..df5ee0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -1822,6 +1822,12 @@ public class NameNode extends ReconfigurableBase implements return getNamesystem().getBytesInFuture(); } + @Override + public String getSlowPeersReport() { + return namesystem.getBlockManager().getDatanodeManager() + .getSlowPeersReport(); + } + /** * Shutdown the NN immediately in an ungraceful way. Used when it would be * unsafe for the NN to continue operating, e.g. during a failed HA state http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4a1e8dd..f9cfa42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -205,6 +206,8 @@ import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; +import javax.annotation.Nonnull; + /** * This class is responsible for handling all of the RPC calls to the NameNode. * It is created, started, and stopped by {@link NameNode}. @@ -1418,12 +1421,14 @@ public class NameNodeRpcServer implements NamenodeProtocols { StorageReport[] report, long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) throws IOException { + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, - failedVolumes, volumeFailureSummary, requestFullBlockReportLease); + failedVolumes, volumeFailureSummary, requestFullBlockReportLease, + slowPeers); } @Override // DatanodeProtocol http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java index 7b37372..f46b9ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java @@ -69,4 +69,10 @@ public interface NameNodeStatusMXBean { * @return number of bytes that can be deleted if exited from safe mode. */ long getBytesWithFutureGenerationStamps(); + + /** + * Retrieves information about slow DataNodes, if the feature is + * enabled. The report is in a JSON format. + */ + String getSlowPeersReport(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 8c4359f..d738e79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.security.KerberosInfo; +import javax.annotation.Nonnull; + /********************************************************************** * Protocol that a DFS datanode uses to communicate with the NameNode. * It's used to upload current load information and block reports. @@ -105,6 +107,9 @@ public interface DatanodeProtocol { * @param volumeFailureSummary info about volume failures * @param requestFullBlockReportLease whether to request a full block * report lease. + * @param slowPeers Details of peer DataNodes that were detected as being + * slow to respond to packet writes. Empty report if no + * slow peers were detected by the DataNode. * @throws IOException on error */ @Idempotent @@ -116,7 +121,8 @@ public interface DatanodeProtocol { int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 7423b33..3b25a43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -195,6 +195,7 @@ message VolumeFailureSummaryProto { * cacheCapacity - total cache capacity available at the datanode * cacheUsed - amount of cache used * volumeFailureSummary - info about volume failures + * slowPeers - info about peer DataNodes that are suspected to be slow. */ message HeartbeatRequestProto { required DatanodeRegistrationProto registration = 1; // Datanode info @@ -206,6 +207,7 @@ message HeartbeatRequestProto { optional uint64 cacheUsed = 7 [default = 0 ]; optional VolumeFailureSummaryProto volumeFailureSummary = 8; optional bool requestFullBlockReportLease = 9 [ default = false ]; + repeated SlowPeerReportProto slowPeers = 10; } /** @@ -386,6 +388,24 @@ message CommitBlockSynchronizationResponseProto { } /** + * Information about a single slow peer that may be reported by + * the DataNode to the NameNode as part of the heartbeat request. + * The message includes the peer's DataNodeId and its + * aggregate packet latency as observed by the reporting DataNode. + * (DataNodeId must be transmitted as a string for protocol compability + * with earlier versions of Hadoop). + * + * The exact choice of the aggregate is opaque to the NameNode but it + * _should_ be chosen consistenly by all DataNodes in the cluster. + * Examples of aggregates are 90th percentile (good) and mean (not so + * good). + */ +message SlowPeerReportProto { + optional string dataNodeId = 1; + optional double aggregateLatency = 2; +} + +/** * Protocol used from datanode to the namenode * See the request and response for details of rpc call. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 3389d84..966cb2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1980,19 +1980,15 @@ </property> <property> - <name>dfs.metrics.rolling.average.window.size</name> - <value>3600</value> + <name>dfs.datanode.slow.peers.report.interval</name> + <value>30m</value> <description> - The number of seconds of each window for which sub set of samples are gathered - to compute the rolling average, A.K.A. roll over interval. - </description> -</property> + This setting controls how frequently DataNodes will report their peer + latencies to the NameNode via heartbeats. This setting supports + multiple time unit suffixes as described in dfs.heartbeat.interval. + If no suffix is specified then milliseconds is assumed. -<property> - <name>dfs.metrics.rolling.average.window.numbers</name> - <value>48</value> - <description> - The number of windows maintained to compute the rolling average. + It is ignored if dfs.datanode.peer.stats.enabled is false. </description> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 7f5cf2d..ff08528 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntryScope; import org.apache.hadoop.fs.permission.AclEntryType; @@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.io.Text; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; @@ -770,6 +772,26 @@ public class TestPBHelper { assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed()); } + @Test + public void testSlowPeerInfoPBHelper() { + // Test with a map that has a few slow peer entries. + final SlowPeerReports slowPeers = SlowPeerReports.create( + ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0)); + SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo( + PBHelper.convertSlowPeerInfo(slowPeers)); + assertTrue( + "Expected map:" + slowPeers + ", got map:" + + slowPeersConverted1.getSlowPeers(), + slowPeersConverted1.equals(slowPeers)); + + // Test with an empty map. + SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo( + PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT)); + assertTrue( + "Expected empty map:" + ", got map:" + slowPeersConverted2, + slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT)); + } + private void assertBlockECRecoveryInfoEquals( BlockECReconstructionInfo blkECRecoveryInfo1, BlockECReconstructionInfo blkECRecoveryInfo2) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index ab607ea..f12f6f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -42,13 +42,23 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import org.mockito.Mockito; /** * Test if FSNamesystem handles heartbeat right */ public class TestHeartbeatHandling { + + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + /** * Test if * {@link FSNamesystem#handleHeartbeat} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index 070a768..a5c6e0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; import java.util.ArrayList; import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -112,7 +114,7 @@ public class TestNameNodePrunesMissingStorages { // Stop the DataNode and send fake heartbeat with missing storage. cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, - 0, null, true); + 0, null, true, SlowPeerReports.EMPTY_REPORT); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
