HDFS-11789. Maintain Short-Circuit Read Statistics. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6d116ffa Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6d116ffa Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6d116ffa Branch: refs/heads/HADOOP-13345 Commit: 6d116ffad23b470f8e9ca131d8e89cbbbb4378d7 Parents: 49aa60e Author: Arpit Agarwal <[email protected]> Authored: Thu Jun 22 13:35:56 2017 -0700 Committer: Arpit Agarwal <[email protected]> Committed: Thu Jun 22 13:35:56 2017 -0700 ---------------------------------------------------------------------- .../hdfs/client/HdfsClientConfigKeys.java | 4 + .../hdfs/client/impl/BlockReaderLocal.java | 52 ++++- .../hadoop/hdfs/client/impl/DfsClientConf.java | 26 +++ .../impl/metrics/BlockReaderIoProvider.java | 89 ++++++++ .../impl/metrics/BlockReaderLocalMetrics.java | 78 +++++++ .../hdfs/client/impl/metrics/package-info.java | 27 +++ .../client/impl/TestBlockReaderIoProvider.java | 75 ++++++ .../impl/TestBlockReaderLocalMetrics.java | 227 +++++++++++++++++++ 8 files changed, 566 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index fbc8d89..5667989 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -343,6 +343,10 @@ public interface HdfsClientConfigKeys { int STREAMS_CACHE_SIZE_DEFAULT = 256; String STREAMS_CACHE_EXPIRY_MS_KEY = PREFIX + "streams.cache.expiry.ms"; long STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5*MINUTE; + + String METRICS_SAMPLING_PERCENTAGE_KEY = + PREFIX + "metrics.sampling.percentage"; + int METRICS_SAMPLING_PERCENTAGE_DEFAULT = 0; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java index 1b38996..df0f65f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java @@ -17,17 +17,16 @@ */ package org.apache.hadoop.hdfs.client.impl; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.EnumSet; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider; +import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -35,15 +34,19 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DirectBufferPool; +import org.apache.hadoop.util.Timer; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.EnumSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * BlockReaderLocal enables local short circuited reads. If the DFS client is on * the same machine as the datanode, then the client can read files directly @@ -66,6 +69,11 @@ class BlockReaderLocal implements BlockReader { private static final DirectBufferPool bufferPool = new DirectBufferPool(); + private static BlockReaderLocalMetrics metrics; + private static Lock metricsInitializationLock = new ReentrantLock(); + private final BlockReaderIoProvider blockReaderIoProvider; + private static final Timer TIMER = new Timer(); + public static class Builder { private final int bufferSize; private boolean verifyChecksum; @@ -76,8 +84,10 @@ class BlockReaderLocal implements BlockReader { private ExtendedBlock block; private StorageType storageType; private Tracer tracer; + private ShortCircuitConf shortCircuitConf; public Builder(ShortCircuitConf conf) { + this.shortCircuitConf = conf; this.maxReadahead = Integer.MAX_VALUE; this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); this.bufferSize = conf.getShortCircuitBufferSize(); @@ -269,6 +279,20 @@ class BlockReaderLocal implements BlockReader { this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; this.storageType = builder.storageType; this.tracer = builder.tracer; + + if (builder.shortCircuitConf.isScrMetricsEnabled()) { + metricsInitializationLock.lock(); + try { + if (metrics == null) { + metrics = BlockReaderLocalMetrics.create(); + } + } finally { + metricsInitializationLock.unlock(); + } + } + + this.blockReaderIoProvider = new BlockReaderIoProvider( + builder.shortCircuitConf, metrics, TIMER); } private synchronized void createDataBufIfNeeded() { @@ -342,7 +366,7 @@ class BlockReaderLocal implements BlockReader { long startDataPos = dataPos; int startBufPos = buf.position(); while (buf.hasRemaining()) { - int nRead = dataIn.read(buf, dataPos); + int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos); if (nRead < 0) { break; } @@ -435,7 +459,7 @@ class BlockReaderLocal implements BlockReader { freeChecksumBufIfExists(); int total = 0; while (buf.hasRemaining()) { - int nRead = dataIn.read(buf, dataPos); + int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos); if (nRead <= 0) break; dataPos += nRead; total += nRead; @@ -574,7 +598,8 @@ class BlockReaderLocal implements BlockReader { int len) throws IOException { freeDataBufIfExists(); freeChecksumBufIfExists(); - int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); + int nRead = blockReaderIoProvider.read( + dataIn, ByteBuffer.wrap(arr, off, len), dataPos); if (nRead > 0) { dataPos += nRead; } else if ((nRead == 0) && (dataPos == dataIn.size())) { @@ -627,6 +652,9 @@ class BlockReaderLocal implements BlockReader { replica.unref(); freeDataBufIfExists(); freeChecksumBufIfExists(); + if (metrics != null) { + metrics.collectThreadLocalStates(); + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index b2fd487..332abb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -593,6 +593,10 @@ public class DfsClientConf { private final long shortCircuitStreamsCacheExpiryMs; private final int shortCircuitSharedMemoryWatcherInterruptCheckMs; + // Short Circuit Read Metrics + private final boolean scrMetricsEnabled; + private final int scrMetricsSamplingPercentage; + private final boolean shortCircuitMmapEnabled; private final int shortCircuitMmapCacheSize; private final long shortCircuitMmapCacheExpiryMs; @@ -615,6 +619,20 @@ public class DfsClientConf { shortCircuitLocalReads = conf.getBoolean( Read.ShortCircuit.KEY, Read.ShortCircuit.DEFAULT); + int scrSamplingPercentage = conf.getInt( + Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_KEY, + Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_DEFAULT); + if (scrSamplingPercentage <= 0) { + scrMetricsSamplingPercentage = 0; + scrMetricsEnabled = false; + } else if (scrSamplingPercentage > 100) { + scrMetricsSamplingPercentage = 100; + scrMetricsEnabled = true; + } else { + scrMetricsSamplingPercentage = scrSamplingPercentage; + scrMetricsEnabled = true; + } + domainSocketDataTraffic = conf.getBoolean( DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); @@ -693,6 +711,14 @@ public class DfsClientConf { return shortCircuitLocalReads; } + public boolean isScrMetricsEnabled() { + return scrMetricsEnabled; + } + + public int getScrMetricsSamplingPercentage() { + return scrMetricsSamplingPercentage; + } + public boolean isDomainSocketDataTraffic() { return domainSocketDataTraffic; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java new file mode 100644 index 0000000..0792db8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl.metrics; + +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Profiles {@link org.apache.hadoop.hdfs.client.impl.BlockReaderLocal} short + * circuit read latencies when ShortCircuit read metrics is enabled through + * {@link ShortCircuitConf#scrMetricsEnabled}. + */ +public class BlockReaderIoProvider { + public static final Logger LOG = LoggerFactory.getLogger( + BlockReaderIoProvider.class); + + private final BlockReaderLocalMetrics metrics; + private final boolean isEnabled; + private final int sampleRangeMax; + private final Timer timer; + + // Threshold in milliseconds above which a warning should be flagged. + private static final long SLOW_READ_WARNING_THRESHOLD_MS = 1000; + private boolean isWarningLogged = false; + + public BlockReaderIoProvider(@Nullable ShortCircuitConf conf, + BlockReaderLocalMetrics metrics, Timer timer) { + if (conf != null) { + isEnabled = conf.isScrMetricsEnabled(); + sampleRangeMax = (Integer.MAX_VALUE / 100) * + conf.getScrMetricsSamplingPercentage(); + this.metrics = metrics; + this.timer = timer; + } else { + this.isEnabled = false; + this.sampleRangeMax = 0; + this.metrics = null; + this.timer = null; + } + } + + public int read(FileChannel dataIn, ByteBuffer dst, long position) + throws IOException{ + final int nRead; + if (isEnabled && (ThreadLocalRandom.current().nextInt() < sampleRangeMax)) { + long begin = timer.monotonicNow(); + nRead = dataIn.read(dst, position); + long latency = timer.monotonicNow() - begin; + addLatency(latency); + } else { + nRead = dataIn.read(dst, position); + } + return nRead; + } + + private void addLatency(long latency) { + metrics.addShortCircuitReadLatency(latency); + if (latency > SLOW_READ_WARNING_THRESHOLD_MS && !isWarningLogged) { + LOG.warn(String.format("The Short Circuit Local Read latency, %d ms, " + + "is higher then the threshold (%d ms). Suppressing further warnings" + + " for this BlockReaderLocal.", + latency, SLOW_READ_WARNING_THRESHOLD_MS)); + isWarningLogged = true; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java new file mode 100644 index 0000000..61b497e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl.metrics; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableRollingAverages; + +/** + * This class maintains a metric of rolling average latency for short circuit + * reads. + */ [email protected] +@Metrics(name="HdfsShortCircuitReads", + about="Block Reader Local's Short Circuit Read latency", + context="dfs") +public class BlockReaderLocalMetrics { + + @Metric(value = "short circuit read operation rate", valueName = "LatencyMs") + private MutableRollingAverages shortCircuitReadRollingAverages; + + private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME = + "HdfsShortCircuitReads"; + private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME = + "ShortCircuitLocalReads"; + + public static BlockReaderLocalMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + BlockReaderLocalMetrics metrics = new BlockReaderLocalMetrics(); + + ms.register( + SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME, null, metrics); + return metrics; + } + + /** + * Adds short circuit read elapsed time. + */ + public void addShortCircuitReadLatency(final long latency) { + shortCircuitReadRollingAverages.add( + SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME, latency); + } + + /** + * Collects states maintained in {@link ThreadLocal}, if any. + */ + public void collectThreadLocalStates() { + shortCircuitReadRollingAverages.collectThreadLocalStates(); + } + + /** + * Get the MutableRollingAverage metric for testing only. + * @return + */ + @VisibleForTesting + public MutableRollingAverages getShortCircuitReadRollingAverages() { + return shortCircuitReadRollingAverages; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java new file mode 100644 index 0000000..a97ed43 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Support for tracking Block Reader Local's latencies. + */ [email protected] [email protected] +package org.apache.hadoop.hdfs.client.impl.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java new file mode 100644 index 0000000..3eae516 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider; +import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.times; + +/** + * Tests {@link BlockReaderIoProvider}'s profiling of short circuit read + * latencies. + */ +public class TestBlockReaderIoProvider { + + private static final long SLOW_READ_THRESHOLD = 5000; + + private static final FakeTimer TIMER = new FakeTimer(); + + @Test(timeout = 300_000) + public void testSlowShortCircuitReadsIsRecorded() throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit + .METRICS_SAMPLING_PERCENTAGE_KEY, 100); + DfsClientConf clientConf = new DfsClientConf(conf); + + BlockReaderLocalMetrics metrics = Mockito.mock( + BlockReaderLocalMetrics.class); + + FileChannel dataIn = Mockito.mock(FileChannel.class); + Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer( + new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + TIMER.advance(SLOW_READ_THRESHOLD); + return 0; + } + }); + + BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider( + clientConf.getShortCircuitConf(), metrics, TIMER); + + blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong()); + + Mockito.verify(metrics, times(1)).addShortCircuitReadLatency(anyLong()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java new file mode 100644 index 0000000..b461f2e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import com.google.common.base.Supplier; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider; +import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.MetricsTestHelper; +import org.apache.hadoop.metrics2.lib.MutableRollingAverages; +import org.apache.hadoop.test.GenericTestUtils; +import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import org.apache.hadoop.util.FakeTimer; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Tests {@link BlockReaderLocalMetrics}'s statistics. + */ +public class TestBlockReaderLocalMetrics { + private static final long ROLLING_AVERAGES_WINDOW_LENGTH_MS = 1000; + private static final int ROLLING_AVERAGE_NUM_WINDOWS = 5; + private static final long SLOW_READ_DELAY = 2000; + private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME = + "HdfsShortCircuitReads"; + private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME = + "[ShortCircuitLocalReads]RollingAvgLatencyMs"; + + private static final FakeTimer TIMER = new FakeTimer(); + + private static HdfsConfiguration conf = new HdfsConfiguration(); + private static DfsClientConf clientConf; + + static { + conf = new HdfsConfiguration(); + conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit + .METRICS_SAMPLING_PERCENTAGE_KEY, 100); + clientConf = new DfsClientConf(conf); + } + + @Test(timeout = 300_000) + public void testSlowShortCircuitReadsStatsRecorded() throws IOException, + InterruptedException, TimeoutException { + + BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create(); + MutableRollingAverages shortCircuitReadRollingAverages = metrics + .getShortCircuitReadRollingAverages(); + MetricsTestHelper.replaceRollingAveragesScheduler( + shortCircuitReadRollingAverages, + ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS, + TimeUnit.MILLISECONDS); + + FileChannel dataIn = Mockito.mock(FileChannel.class); + Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer( + new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + TIMER.advance(SLOW_READ_DELAY); + return 0; + } + }); + + BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider( + clientConf.getShortCircuitConf(), metrics, TIMER); + + blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong()); + blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + metrics.collectThreadLocalStates(); + return shortCircuitReadRollingAverages.getStats(0).size() > 0; + } + }, 500, 10000); + + MetricsRecordBuilder rb = getMetrics( + SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME); + double averageLatency = getDoubleGauge( + SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb); + assertTrue("Average Latency of Short Circuit Reads lower than expected", + averageLatency >= SLOW_READ_DELAY); + } + + @Test(timeout = 300_000) + public void testMutlipleBlockReaderIoProviderStats() throws IOException, + InterruptedException, TimeoutException { + + BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create(); + MutableRollingAverages shortCircuitReadRollingAverages = metrics + .getShortCircuitReadRollingAverages(); + MetricsTestHelper.replaceRollingAveragesScheduler( + shortCircuitReadRollingAverages, + ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS, + TimeUnit.MILLISECONDS); + + FileChannel dataIn1 = Mockito.mock(FileChannel.class); + FileChannel dataIn2 = Mockito.mock(FileChannel.class); + + Mockito.when(dataIn1.read(any(ByteBuffer.class), anyLong())).thenAnswer( + new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + TIMER.advance(SLOW_READ_DELAY); + return 0; + } + }); + + Mockito.when(dataIn2.read(any(ByteBuffer.class), anyLong())).thenAnswer( + new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + TIMER.advance(SLOW_READ_DELAY*3); + return 0; + } + }); + + BlockReaderIoProvider blockReaderIoProvider1 = new BlockReaderIoProvider( + clientConf.getShortCircuitConf(), metrics, TIMER); + BlockReaderIoProvider blockReaderIoProvider2 = new BlockReaderIoProvider( + clientConf.getShortCircuitConf(), metrics, TIMER); + + blockReaderIoProvider1.read(dataIn1, any(ByteBuffer.class), anyLong()); + blockReaderIoProvider2.read(dataIn2, any(ByteBuffer.class), anyLong()); + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + metrics.collectThreadLocalStates(); + return shortCircuitReadRollingAverages.getStats(0).size() > 0; + } + }, 500, 10000); + + MetricsRecordBuilder rb = getMetrics( + SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME); + double averageLatency = getDoubleGauge( + SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb); + + assertTrue("Average Latency of Short Circuit Reads lower than expected", + averageLatency >= SLOW_READ_DELAY*2); + } + + @Test(timeout = 300_000) + public void testSlowShortCircuitReadsAverageLatencyValue() throws IOException, + InterruptedException, TimeoutException { + + BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create(); + final MutableRollingAverages shortCircuitReadRollingAverages = metrics + .getShortCircuitReadRollingAverages(); + MetricsTestHelper.replaceRollingAveragesScheduler( + shortCircuitReadRollingAverages, + ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS, + TimeUnit.MILLISECONDS); + + Random random = new Random(); + FileChannel[] dataIns = new FileChannel[5]; + long totalDelay = 0; + + for (int i = 0; i < 5; i++) { + dataIns[i] = Mockito.mock(FileChannel.class); + long delay = SLOW_READ_DELAY * random.nextInt(5); + Mockito.when(dataIns[i].read(any(ByteBuffer.class), anyLong())) + .thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + TIMER.advance(delay); + return 0; + } + }); + totalDelay += delay; + } + long expectedAvgLatency = totalDelay / 5; + + BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider( + clientConf.getShortCircuitConf(), metrics, TIMER); + + for (int i = 0; i < 5; i++) { + blockReaderIoProvider.read(dataIns[i], any(ByteBuffer.class), anyLong()); + } + + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + metrics.collectThreadLocalStates(); + return shortCircuitReadRollingAverages.getStats(0).size() > 0; + } + }, 500, 10000); + + MetricsRecordBuilder rb = getMetrics( + SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME); + double averageLatency = getDoubleGauge( + SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb); + + assertTrue("Average Latency of Short Circuit Reads lower than expected", + averageLatency >= expectedAvgLatency); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
