Repository: hbase
Updated Branches:
refs/heads/branch-1 8b167e4c9 -> 71ed70336
Add hedgedReads and hedgedReadWins count metrics
Conflicts:
hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/71ed7033
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/71ed7033
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/71ed7033
Branch: refs/heads/branch-1
Commit: 71ed7033675149956de855b6782e1e22fc908dc8
Parents: 8b167e4
Author: stack <[email protected]>
Authored: Thu Oct 9 12:50:56 2014 -0700
Committer: stack <[email protected]>
Committed: Thu Oct 9 15:02:50 2014 -0700
----------------------------------------------------------------------
.../regionserver/MetricsRegionServerSource.java | 5 +
.../MetricsRegionServerWrapper.java | 10 ++
.../MetricsRegionServerSourceImpl.java | 5 +
.../hadoop/hbase/master/SplitLogManager.java | 7 +-
.../MetricsRegionServerWrapperImpl.java | 23 +++
.../org/apache/hadoop/hbase/util/FSUtils.java | 45 ++++++
.../MetricsRegionServerWrapperStub.java | 9 ++
.../apache/hadoop/hbase/util/TestFSUtils.java | 155 +++++++++++++++++++
8 files changed, 256 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
----------------------------------------------------------------------
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index 4398794..fa79523 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -234,4 +234,9 @@ public interface MetricsRegionServerSource extends
BaseSource {
String MAJOR_COMPACTED_CELLS_SIZE_DESC =
"The total amount of data processed during major compactions, in bytes";
+ String HEDGED_READS = "hedgedReads";
+ String HEDGED_READS_DESC = "The number of times we started a hedged read";
+ String HEDGED_READ_WINS = "hedgedReadWins";
+ String HEDGED_READ_WINS_DESC =
+ "The number of times we started a hedged read and a hedged read won";
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
----------------------------------------------------------------------
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
index 998bd17..513a0db 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
@@ -246,4 +246,14 @@ public interface MetricsRegionServerWrapper {
* Get the total amount of data processed during major compactions, in bytes.
*/
long getMajorCompactedCellsSize();
+
+ /**
+ * @return Count of hedged read operations
+ */
+ public long getHedgedReadOps();
+
+ /**
+ * @return Count of times a hedged read beat out the primary read.
+ */
+ public long getHedgedReadWins();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
----------------------------------------------------------------------
diff --git
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index f859296..a6377c0 100644
---
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -219,6 +219,11 @@ public class MetricsRegionServerSourceImpl
rsWrap.getCompactedCellsSize())
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE,
MAJOR_COMPACTED_CELLS_SIZE_DESC),
rsWrap.getMajorCompactedCellsSize())
+
+ .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC),
rsWrap.getHedgedReadOps())
+ .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
+ rsWrap.getHedgedReadWins())
+
.tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
rsWrap.getZookeeperQuorum())
.tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC),
rsWrap.getServerName())
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 97ac02c..bf28a44 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -569,9 +570,9 @@ public class SplitLogManager {
* @return whether log is replaying
*/
public boolean isLogReplaying() {
- if (server.getCoordinatedStateManager() == null) return false;
- return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
- .getSplitLogManagerCoordination().isReplaying();
+ CoordinatedStateManager m = server.getCoordinatedStateManager();
+ if (m == null) return false;
+ return
((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 3ae2235..327f55c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,7 +34,9 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.metrics2.MetricsExecutor;
/**
@@ -78,6 +81,11 @@ class MetricsRegionServerWrapperImpl
private Runnable runnable;
private long period;
+ /**
+ * Can be null if not on hdfs.
+ */
+ private DFSHedgedReadMetrics dfsHedgedReadMetrics;
+
public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
this.regionServer = regionServer;
initBlockCache();
@@ -91,6 +99,11 @@ class MetricsRegionServerWrapperImpl
this.executor.scheduleWithFixedDelay(this.runnable, this.period,
this.period,
TimeUnit.MILLISECONDS);
+ try {
+ this.dfsHedgedReadMetrics =
FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
+ } catch (IOException e) {
+ LOG.warn("Failed to get hedged metrics", e);
+ }
if (LOG.isInfoEnabled()) {
LOG.info("Computing regionserver metrics every " + this.period + "
milliseconds");
}
@@ -513,4 +526,14 @@ class MetricsRegionServerWrapperImpl
majorCompactedCellsSize = tempMajorCompactedCellsSize;
}
}
+
+ @Override
+ public long getHedgedReadOps() {
+ return this.dfsHedgedReadMetrics == null? 0:
this.dfsHedgedReadMetrics.getHedgedReadOps();
+ }
+
+ @Override
+ public long getHedgedReadWins() {
+ return this.dfsHedgedReadMetrics == null? 0:
this.dfsHedgedReadMetrics.getHedgedReadWins();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index ed1a4e6..51af440 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -70,6 +70,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.IOUtils;
@@ -1869,4 +1871,47 @@ public abstract class FSUtils {
int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
}
+
+ /**
+ * @param c
+ * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be
found or not on hdfs.
+ * @throws IOException
+ */
+ public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final
Configuration c)
+ throws IOException {
+ if (!isHDFS(c)) return null;
+ // getHedgedReadMetrics is package private. Get the DFSClient instance
that is internal
+ // to the DFS FS instance and make the method getHedgedReadMetrics
accessible, then invoke it
+ // to get the singleton instance of DFSHedgedReadMetrics shared by
DFSClients.
+ final String name = "getHedgedReadMetrics";
+ DFSClient dfsclient =
((DistributedFileSystem)FileSystem.get(c)).getClient();
+ Method m;
+ try {
+ m = dfsclient.getClass().getDeclaredMethod(name);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Failed find method " + name + " in dfsclient; no hedged read
metrics: " +
+ e.getMessage());
+ return null;
+ } catch (SecurityException e) {
+ LOG.warn("Failed find method " + name + " in dfsclient; no hedged read
metrics: " +
+ e.getMessage());
+ return null;
+ }
+ m.setAccessible(true);
+ try {
+ return (DFSHedgedReadMetrics)m.invoke(dfsclient);
+ } catch (IllegalAccessException e) {
+ LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged
read metrics: " +
+ e.getMessage());
+ return null;
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged
read metrics: " +
+ e.getMessage());
+ return null;
+ } catch (InvocationTargetException e) {
+ LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged
read metrics: " +
+ e.getMessage());
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
index 036af48..ca95c4a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
@@ -241,4 +241,13 @@ public class MetricsRegionServerWrapperStub implements
MetricsRegionServerWrappe
return 10240000;
}
+ @Override
+ public long getHedgedReadOps() {
+ return 100;
+ }
+
+ @Override
+ public long getHedgedReadWins() {
+ return 10;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/71ed7033/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 417c4a8..da1b603 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -25,11 +25,14 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -41,6 +44,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -334,4 +340,153 @@ public class TestFSUtils {
assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
cluster.shutdown();
}
+
+ /**
+ * Ugly test that ensures we can get at the hedged read counters in
dfsclient.
+ * Does a bit of preading with hedged reads enabled using code taken from
hdfs TestPread.
+ * @throws Exception
+ */
+ @Test public void testDFSHedgedReadMetrics() throws Exception {
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ // Enable hedged reads and set it so the threshold is really low.
+ // Most of this test is taken from HDFS, from TestPread.
+ Configuration conf = htu.getConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
+ conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
+ // Set short retry timeouts so this test runs faster
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
+ conf.setBoolean("dfs.datanode.transferTo.allowed", false);
+ MiniDFSCluster cluster = new
MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ // Get the metrics. Should be empty.
+ DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
+ assertEquals(0, metrics.getHedgedReadOps());
+ FileSystem fileSys = cluster.getFileSystem();
+ try {
+ Path p = new Path("preadtest.dat");
+ // We need > 1 blocks to test out the hedged reads.
+ DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
+ blockSize, (short) 3, seed);
+ pReadFile(fileSys, p);
+ cleanupFile(fileSys, p);
+ assertTrue(metrics.getHedgedReadOps() > 0);
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ }
+ }
+
+ // Below is taken from TestPread over in HDFS.
+ static final int blockSize = 4096;
+ static final long seed = 0xDEADBEEFL;
+
+ private void pReadFile(FileSystem fileSys, Path name) throws IOException {
+ FSDataInputStream stm = fileSys.open(name);
+ byte[] expected = new byte[12 * blockSize];
+ Random rand = new Random(seed);
+ rand.nextBytes(expected);
+ // do a sanity check. Read first 4K bytes
+ byte[] actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+ // now do a pread for the first 8K bytes
+ actual = new byte[8192];
+ doPread(stm, 0L, actual, 0, 8192);
+ checkAndEraseData(actual, 0, expected, "Pread Test 1");
+ // Now check to see if the normal read returns 4K-8K byte range
+ actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 4096, expected, "Pread Test 2");
+ // Now see if we can cross a single block boundary successfully
+ // read 4K bytes from blockSize - 2K offset
+ stm.readFully(blockSize - 2048, actual, 0, 4096);
+ checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
+ // now see if we can cross two block boundaries successfully
+ // read blockSize + 4K bytes from blockSize - 2K offset
+ actual = new byte[blockSize + 4096];
+ stm.readFully(blockSize - 2048, actual);
+ checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
+ // now see if we can cross two block boundaries that are not cached
+ // read blockSize + 4K bytes from 10*blockSize - 2K offset
+ actual = new byte[blockSize + 4096];
+ stm.readFully(10 * blockSize - 2048, actual);
+ checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test
5");
+ // now check that even after all these preads, we can still read
+ // bytes 8K-12K
+ actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 8192, expected, "Pread Test 6");
+ // done
+ stm.close();
+ // check block location caching
+ stm = fileSys.open(name);
+ stm.readFully(1, actual, 0, 4096);
+ stm.readFully(4*blockSize, actual, 0, 4096);
+ stm.readFully(7*blockSize, actual, 0, 4096);
+ actual = new byte[3*4096];
+ stm.readFully(0*blockSize, actual, 0, 3*4096);
+ checkAndEraseData(actual, 0, expected, "Pread Test 7");
+ actual = new byte[8*4096];
+ stm.readFully(3*blockSize, actual, 0, 8*4096);
+ checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
+ // read the tail
+ stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
+ IOException res = null;
+ try { // read beyond the end of the file
+ stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
+ } catch (IOException e) {
+ // should throw an exception
+ res = e;
+ }
+ assertTrue("Error reading beyond file boundary.", res != null);
+
+ stm.close();
+ }
+
+ private void checkAndEraseData(byte[] actual, int from, byte[] expected,
String message) {
+ for (int idx = 0; idx < actual.length; idx++) {
+ assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+ expected[from+idx]+" actual "+actual[idx],
+ actual[idx], expected[from+idx]);
+ actual[idx] = 0;
+ }
+ }
+
+ private void doPread(FSDataInputStream stm, long position, byte[] buffer,
+ int offset, int length) throws IOException {
+ int nread = 0;
+ // long totalRead = 0;
+ // DFSInputStream dfstm = null;
+
+ /* Disable. This counts do not add up. Some issue in original hdfs tests?
+ if (stm.getWrappedStream() instanceof DFSInputStream) {
+ dfstm = (DFSInputStream) (stm.getWrappedStream());
+ totalRead = dfstm.getReadStatistics().getTotalBytesRead();
+ } */
+
+ while (nread < length) {
+ int nbytes =
+ stm.read(position + nread, buffer, offset + nread, length - nread);
+ assertTrue("Error in pread", nbytes > 0);
+ nread += nbytes;
+ }
+
+ /* Disable. This counts do not add up. Some issue in original hdfs tests?
+ if (dfstm != null) {
+ if (isHedgedRead) {
+ assertTrue("Expected read statistic to be incremented",
+ length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
+ } else {
+ assertEquals("Expected read statistic to be incremented", length, dfstm
+ .getReadStatistics().getTotalBytesRead() - totalRead);
+ }
+ }*/
+ }
+
+ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+ assertTrue(fileSys.exists(name));
+ assertTrue(fileSys.delete(name, true));
+ assertTrue(!fileSys.exists(name));
+ }
}