Repository: accumulo Updated Branches: refs/heads/master efbb394b7 -> e123c26c7
ACCUMULO-3976 print hdfs pipeline when the walog flush is slow Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e123c26c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e123c26c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e123c26c Branch: refs/heads/master Commit: e123c26c7425ec04ac4cbf3c62daae9e1994f4e6 Parents: efbb394 Author: Eric C. Newton <[email protected]> Authored: Fri Aug 28 15:42:33 2015 -0400 Committer: Eric C. Newton <[email protected]> Committed: Fri Aug 28 15:42:33 2015 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../apache/accumulo/tserver/log/DfsLogger.java | 60 ++++++++++++++++++++ 2 files changed, 62 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e123c26c/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index ef4d877..5bd5c8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -332,6 +332,8 @@ public enum Property { "Memory to provide to batchwriter to replay mutations for replication"), TSERV_ASSIGNMENT_MAXCONCURRENT("tserver.assignment.concurrent.max", "2", PropertyType.COUNT, "The number of threads available to load tablets. Recoveries are still performed serially."), + TSERV_SLOW_FLUSH_MILLIS("tserver.slow.flush.time", "100ms", PropertyType.TIMEDURATION, + "If a flush to the write-ahead log takes longer than this period of time, debugging information will written, and may result in a log rollover."), // properties that are specific to logger server behavior LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/e123c26c/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 397eeff..bdc7364 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -62,6 +62,7 @@ import org.apache.accumulo.tserver.logger.LogFileValue; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,7 @@ public class DfsLogger implements Comparable<DfsLogger> { public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; private static final Logger log = LoggerFactory.getLogger(DfsLogger.class); + private static final Object[] NO_ARGS = new Object[] {}; public static class LogClosedException extends IOException { private static final long serialVersionUID = 1L; @@ -176,6 +178,7 @@ public class DfsLogger implements Comparable<DfsLogger> { } } + long start = System.currentTimeMillis(); try { if (durabilityMethod != null) { durabilityMethod.invoke(logFile); @@ -191,6 +194,12 @@ public class DfsLogger implements Comparable<DfsLogger> { logWork.exception = ex; } } + long duration = System.currentTimeMillis() - start; + if (duration > slowFlushMillis) { + String msg = new StringBuilder().append("Slow sync cost: ").append(duration).append(" ms, current pipeline: ").append(Arrays.toString(getPipeLine())) + .toString(); + log.info(msg); + } for (DfsLogger.LogWork logWork : work) if (logWork == CLOSED_MARKER) @@ -265,11 +274,14 @@ public class DfsLogger implements Comparable<DfsLogger> { private String metaReference; private AtomicLong syncCounter; private AtomicLong flushCounter; + private final long slowFlushMillis; + private Method getPipeLine; public DfsLogger(ServerResources conf, AtomicLong syncCounter, AtomicLong flushCounter) throws IOException { this.conf = conf; this.syncCounter = syncCounter; this.flushCounter = flushCounter; + this.slowFlushMillis = conf.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS); } /** @@ -282,6 +294,7 @@ public class DfsLogger implements Comparable<DfsLogger> { this.conf = conf; this.logPath = filename; metaReference = meta; + this.slowFlushMillis = conf.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS); } public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException { @@ -404,6 +417,7 @@ public class DfsLogger implements Comparable<DfsLogger> { sync = logFile.getClass().getMethod("hsync"); flush = logFile.getClass().getMethod("hflush"); + getPipeLine = this.getGetPipeline(logFile); // Initialize the crypto operations. org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf @@ -629,4 +643,50 @@ public class DfsLogger implements Comparable<DfsLogger> { return getFileName().compareTo(o.getFileName()); } + /* + * The following two methods were shamelessly lifted from HBASE-11240. Thanks HBase! + */ + + /** + * Find the 'getPipeline' on the passed <code>os</code> stream. + * + * @return Method or null. + */ + private Method getGetPipeline(final FSDataOutputStream os) { + Method m = null; + if (os != null) { + Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass(); + try { + m = wrappedStreamClass.getDeclaredMethod("getPipeline", new Class<?>[] {}); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + log.info("FileSystem's output stream doesn't support getPipeline; not available; fsOut=" + wrappedStreamClass.getName()); + } catch (SecurityException e) { + log.info("Doesn't have access to getPipeline on FileSystems's output stream ; fsOut=" + wrappedStreamClass.getName(), e); + m = null; // could happen on setAccessible() + } + } + return m; + } + + /** + * This method gets the pipeline for the current walog. + * + * @return non-null array of DatanodeInfo + */ + DatanodeInfo[] getPipeLine() { + if (this.getPipeLine != null) { + Object repl; + try { + repl = this.getPipeLine.invoke(this.logFile, NO_ARGS); + if (repl instanceof DatanodeInfo[]) { + return ((DatanodeInfo[]) repl); + } + } catch (Exception e) { + log.info("Get pipeline failed", e); + } + } + return new DatanodeInfo[0]; + } + }
