HDFS-7129. Metrics to track usage of memory for writes. (Contributed by Xiaoyu Yao)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5e8b6973 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5e8b6973 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5e8b6973 Branch: refs/heads/trunk Commit: 5e8b6973527e5f714652641ed95e8a4509e18cfa Parents: bb84f1f Author: arp <a...@apache.org> Authored: Tue Sep 30 00:53:18 2014 -0700 Committer: arp <a...@apache.org> Committed: Tue Sep 30 00:53:18 2014 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-6581.txt | 3 + .../datanode/fsdataset/impl/FsDatasetImpl.java | 40 +++++++++- .../impl/RamDiskReplicaLruTracker.java | 20 ++++- .../fsdataset/impl/RamDiskReplicaTracker.java | 23 +++++- .../datanode/metrics/DataNodeMetrics.java | 80 ++++++++++++++++++++ .../org/apache/hadoop/hdfs/tools/JMXGet.java | 18 +++++ .../fsdataset/impl/TestLazyPersistFiles.java | 80 ++++++++++++++++---- 7 files changed, 241 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e8b6973/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt index 2fa855a..3be544a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt @@ -89,4 +89,7 @@ HDFS-7159. Use block storage policy to set lazy persist preference. (Arpit Agarwal) + HDFS-7129. Metrics to track usage of memory for writes. (Xiaoyu Yao + via Arpit Agarwal) + http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e8b6973/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 7abed90..df52e14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1012,11 +1012,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (allowLazyPersist) { // First try to place the block on a transient volume. v = volumes.getNextTransientVolume(b.getNumBytes()); + datanode.getMetrics().incrRamDiskBlocksWrite(); } else { v = volumes.getNextVolume(storageType, b.getNumBytes()); } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { + datanode.getMetrics().incrRamDiskBlocksWriteFallback(); allowLazyPersist = false; continue; } @@ -1244,6 +1246,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (v.isTransientStorage()) { ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); + datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); } } volumeMap.add(bpid, newReplicaInfo); @@ -1499,7 +1502,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } if (v.isTransientStorage()) { - ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true); + RamDiskReplica replicaInfo = + ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId()); + if (replicaInfo != null) { + if (replicaInfo.getIsPersisted() == false) { + datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted(); + } + discardRamDiskReplica(replicaInfo, true); + } } // If a DFSClient has the replica in its cache of short-circuit file @@ -1645,11 +1655,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (info != null) { if (touch && info.getVolume().isTransientStorage()) { ramDiskReplicaTracker.touch(bpid, blockId); + datanode.getMetrics().incrRamDiskBlocksReadHits(); } return info.getBlockFile(); } return null; } + /** * check if a data directory is healthy * if some volumes failed - make sure to remove all the blocks that belong @@ -2303,6 +2315,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { nbytes, flags); } + void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) { + ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(), + replica.getBlockId(), deleteSavedCopies); + } + class LazyWriter implements Runnable { private volatile boolean shouldRun = true; final int checkpointerInterval; @@ -2326,7 +2343,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT); } - private void moveReplicaToNewVolume(String bpid, long blockId) + private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime) throws IOException { FsVolumeImpl targetVolume; @@ -2368,6 +2385,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { synchronized (FsDatasetImpl.this) { ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles); + // Update metrics (ignore the metadata file size) + datanode.getMetrics().incrRamDiskBlocksLazyPersisted(); + datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes()); + datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs( + Time.monotonicNow() - creationTime); + if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + " to file " + savedFiles[1]); @@ -2387,7 +2410,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { - moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId()); + moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(), + block.getCreationTime()); } succeeded = true; } catch(IOException ioe) { @@ -2455,7 +2479,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { metaFile = replicaInfo.getMetaFile(); blockFileUsed = blockFile.length(); metaFileUsed = metaFile.length(); - ramDiskReplicaTracker.discardReplica(replicaState, false); + discardRamDiskReplica(replicaState, false); // Move the replica from lazyPersist/ to finalized/ on target volume BlockPoolSlice bpSlice = @@ -2473,6 +2497,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Update the volumeMap entry. volumeMap.add(bpid, newReplicaInfo); + + // Update metrics + datanode.getMetrics().incrRamDiskBlocksEvicted(); + datanode.getMetrics().addRamDiskBlocksEvictionWindowMs( + Time.monotonicNow() - replicaState.getCreationTime()); + if (replicaState.getNumReads() == 0) { + datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead(); + } } // Before deleting the files from transient storage we must notify the http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e8b6973/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java index 7808003..a843d9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.collect.TreeMultimap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.Time; import java.io.File; import java.util.*; @@ -97,9 +98,11 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { return; } + ramDiskReplicaLru.numReads.getAndIncrement(); + // Reinsert the replica with its new timestamp. if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) { - ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis(); + ramDiskReplicaLru.lastUsedTime = Time.monotonicNow(); replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru); } } @@ -132,8 +135,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { replicasNotPersisted.remove(ramDiskReplicaLru); } - ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis(); + ramDiskReplicaLru.lastUsedTime = Time.monotonicNow(); replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru); + ramDiskReplicaLru.isPersisted = true; } @Override @@ -215,4 +219,16 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { // replicasNotPersisted will be lazily GC'ed. } + + @Override + synchronized RamDiskReplica getReplica( + final String bpid, final long blockId) { + Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid); + + if (map == null) { + return null; + } + + return map.get(blockId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e8b6973/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java index 2401424..7507925 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java @@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; import java.io.File; +import java.util.concurrent.atomic.AtomicLong; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -44,6 +46,10 @@ public abstract class RamDiskReplicaTracker { private File savedBlockFile; private File savedMetaFile; + private long creationTime; + protected AtomicLong numReads = new AtomicLong(0); + protected boolean isPersisted; + /** * RAM_DISK volume that holds the original replica. */ @@ -62,6 +68,8 @@ public abstract class RamDiskReplicaTracker { lazyPersistVolume = null; savedMetaFile = null; savedBlockFile = null; + creationTime = Time.monotonicNow(); + isPersisted = false; } long getBlockId() { @@ -89,6 +97,12 @@ public abstract class RamDiskReplicaTracker { return savedMetaFile; } + long getNumReads() { return numReads.get(); } + + long getCreationTime() { return creationTime; } + + boolean getIsPersisted() {return isPersisted; } + /** * Record the saved meta and block files on the given volume. * @@ -243,7 +257,10 @@ public abstract class RamDiskReplicaTracker { final String bpid, final long blockId, boolean deleteSavedCopies); - void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) { - discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies); - } + /** + * Return RamDiskReplica info given block pool id and block id + * Return null if it does not exist in RamDisk + */ + abstract RamDiskReplica getReplica( + final String bpid, final long blockId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e8b6973/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index b536e7e..57f12db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -65,6 +65,26 @@ public class DataNodeMetrics { @Metric MutableCounterLong writesFromRemoteClient; @Metric MutableCounterLong blocksGetLocalPathInfo; + // RamDisk metrics on read/write + @Metric MutableCounterLong ramDiskBlocksWrite; + @Metric MutableCounterLong ramDiskBlocksWriteFallback; + @Metric MutableCounterLong ramDiskBytesWrite; + @Metric MutableCounterLong ramDiskBlocksReadHits; + + // RamDisk metrics on eviction + @Metric MutableCounterLong ramDiskBlocksEvicted; + @Metric MutableCounterLong ramDiskBlocksEvictedWithoutRead; + @Metric MutableRate ramDiskBlocksEvictionWindowMs; + final MutableQuantiles[] ramDiskBlocksEvictionWindowMsQuantiles; + + + // RamDisk metrics on lazy persist + @Metric MutableCounterLong ramDiskBlocksLazyPersisted; + @Metric MutableCounterLong ramDiskBlocksDeletedBeforeLazyPersisted; + @Metric MutableCounterLong ramDiskBytesLazyPersisted; + @Metric MutableRate ramDiskBlocksLazyPersistWindowMs; + final MutableQuantiles[] ramDiskBlocksLazyPersistWindowMsQuantiles; + @Metric MutableCounterLong fsyncCount; @Metric MutableCounterLong volumeFailures; @@ -107,6 +127,8 @@ public class DataNodeMetrics { fsyncNanosQuantiles = new MutableQuantiles[len]; sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len]; sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len]; + ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len]; + ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len]; for (int i = 0; i < len; i++) { int interval = intervals[i]; @@ -127,6 +149,14 @@ public class DataNodeMetrics { "sendDataPacketTransferNanos" + interval + "s", "Time reading from disk and writing to network while sending " + "a packet in ns", "ops", "latency", interval); + ramDiskBlocksEvictionWindowMsQuantiles[i] = registry.newQuantiles( + "ramDiskBlocksEvictionWindows" + interval + "s", + "Time between the RamDisk block write and eviction in ms", + "ops", "latency", interval); + ramDiskBlocksLazyPersistWindowMsQuantiles[i] = registry.newQuantiles( + "ramDiskBlocksLazyPersistWindows" + interval + "s", + "Time between the RamDisk block write and disk persist in ms", + "ops", "latency", interval); } } @@ -284,4 +314,54 @@ public class DataNodeMetrics { q.add(latencyNanos); } } + + public void incrRamDiskBlocksWrite() { + ramDiskBlocksWrite.incr(); + } + + public void incrRamDiskBlocksWriteFallback() { + ramDiskBlocksWriteFallback.incr(); + } + + public void addRamDiskBytesWrite(long bytes) { + ramDiskBytesWrite.incr(bytes); + } + + public void incrRamDiskBlocksReadHits() { + ramDiskBlocksReadHits.incr(); + } + + public void incrRamDiskBlocksEvicted() { + ramDiskBlocksEvicted.incr(); + } + + public void incrRamDiskBlocksEvictedWithoutRead() { + ramDiskBlocksEvictedWithoutRead.incr(); + } + + public void addRamDiskBlocksEvictionWindowMs(long latencyMs) { + ramDiskBlocksEvictionWindowMs.add(latencyMs); + for (MutableQuantiles q : ramDiskBlocksEvictionWindowMsQuantiles) { + q.add(latencyMs); + } + } + + public void incrRamDiskBlocksLazyPersisted() { + ramDiskBlocksLazyPersisted.incr(); + } + + public void incrRamDiskBlocksDeletedBeforeLazyPersisted() { + ramDiskBlocksDeletedBeforeLazyPersisted.incr(); + } + + public void incrRamDiskBytesLazyPersisted(long bytes) { + ramDiskBytesLazyPersisted.incr(bytes); + } + + public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) { + ramDiskBlocksLazyPersistWindowMs.add(latencyMs); + for (MutableQuantiles q : ramDiskBlocksLazyPersistWindowMsQuantiles) { + q.add(latencyMs); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e8b6973/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java index bafef25..bbd545a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Set; import java.util.TreeSet; +import java.util.regex.Pattern; import javax.management.AttributeNotFoundException; import javax.management.MBeanAttributeInfo; @@ -109,6 +110,23 @@ public class JMXGet { } } + public void printAllMatchedAttributes(String attrRegExp) throws Exception { + err("List of the keys matching " + attrRegExp + " :"); + Object val = null; + Pattern p = Pattern.compile(attrRegExp); + for (ObjectName oname : hadoopObjectNames) { + err(">>>>>>>>jmx name: " + oname.getCanonicalKeyPropertyListString()); + MBeanInfo mbinfo = mbsc.getMBeanInfo(oname); + MBeanAttributeInfo[] mbinfos = mbinfo.getAttributes(); + for (MBeanAttributeInfo mb : mbinfos) { + if (p.matcher(mb.getName()).lookingAt()) { + val = mbsc.getAttribute(oname, mb.getName()); + System.out.format(format, mb.getName(), (val == null) ? "" : val.toString()); + } + } + } + } + /** * get single value by key */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e8b6973/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 928d0d0..91deb55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -31,19 +31,18 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.tools.JMXGet; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; import org.junit.Test; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -58,6 +57,7 @@ import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class TestLazyPersistFiles { @@ -81,14 +81,21 @@ public class TestLazyPersistFiles { private static final int LAZY_WRITER_INTERVAL_SEC = 1; private static final int BUFFER_LENGTH = 4096; private static final int EVICTION_LOW_WATERMARK = 1; + private static final String JMX_SERVICE_NAME = "DataNode"; + private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; private MiniDFSCluster cluster; private DistributedFileSystem fs; private DFSClient client; private Configuration conf; + private JMXGet jmx; @After - public void shutDownCluster() throws IOException { + public void shutDownCluster() throws Exception { + + // Dump all RamDisk JMX metrics before shutdown the cluster + printRamDiskJMXMetrics(); + if (fs != null) { fs.close(); fs = null; @@ -100,6 +107,10 @@ public class TestLazyPersistFiles { cluster.shutdown(); cluster = null; } + + if (jmx != null) { + jmx = null; + } } @Test (timeout=300000) @@ -203,13 +214,15 @@ public class TestLazyPersistFiles { * @throws IOException */ @Test (timeout=300000) - public void testFallbackToDiskFull() throws IOException { + public void testFallbackToDiskFull() throws Exception { startUpCluster(false, 0); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path, DEFAULT); + + verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1); } /** @@ -384,11 +397,10 @@ public class TestLazyPersistFiles { /** * RamDisk eviction after lazy persist to disk. - * @throws IOException - * @throws InterruptedException + * @throws Exception */ @Test (timeout=300000) - public void testRamDiskEviction() throws IOException, InterruptedException { + public void testRamDiskEviction() throws Exception { startUpCluster(true, 1 + EVICTION_LOW_WATERMARK); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); @@ -411,6 +423,9 @@ public class TestLazyPersistFiles { // RAM_DISK. ensureFileReplicasOnStorageType(path2, RAM_DISK); ensureFileReplicasOnStorageType(path1, DEFAULT); + + verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1); + verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1); } /** @@ -454,7 +469,7 @@ public class TestLazyPersistFiles { */ @Test (timeout=300000) public void testRamDiskEvictionIsLru() - throws IOException, InterruptedException { + throws Exception { final int NUM_PATHS = 5; startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -499,6 +514,14 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK); } } + + verifyRamDiskJMXMetric("RamDiskBlocksWrite", NUM_PATHS * 2); + verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 0); + verifyRamDiskJMXMetric("RamDiskBytesWrite", BLOCK_SIZE * NUM_PATHS * 2); + verifyRamDiskJMXMetric("RamDiskBlocksReadHits", NUM_PATHS); + verifyRamDiskJMXMetric("RamDiskBlocksEvicted", NUM_PATHS); + verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 0); + verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 0); } /** @@ -506,9 +529,9 @@ public class TestLazyPersistFiles { * Memory is freed up and file is gone. * @throws IOException */ - @Test (timeout=300000) + @Test // (timeout=300000) public void testDeleteBeforePersist() - throws IOException, InterruptedException { + throws Exception { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); @@ -523,6 +546,8 @@ public class TestLazyPersistFiles { Assert.assertFalse(fs.exists(path)); assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); + + verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1); } /** @@ -533,7 +558,7 @@ public class TestLazyPersistFiles { */ @Test (timeout=300000) public void testDeleteAfterPersist() - throws IOException, InterruptedException { + throws Exception { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -548,9 +573,10 @@ public class TestLazyPersistFiles { client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path)); - triggerBlockReport(); - assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); + + verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1); + verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE); } /** @@ -760,6 +786,11 @@ public class TestLazyPersistFiles { .build(); fs = cluster.getFileSystem(); client = fs.getClient(); + try { + jmx = initJMX(); + } catch (Exception e) { + fail("Failed initialize JMX for testing: " + e); + } LOG.info("Cluster startup complete"); } @@ -929,4 +960,25 @@ public class TestLazyPersistFiles { } } } + + JMXGet initJMX() throws Exception + { + JMXGet jmx = new JMXGet(); + jmx.setService(JMX_SERVICE_NAME); + jmx.init(); + return jmx; + } + + void printRamDiskJMXMetrics() { + try { + jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN); + } catch (Exception e) { + e.printStackTrace(); + } + } + + void verifyRamDiskJMXMetric(String metricName, long expectedValue) + throws Exception { + assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName))); + } }