HBASE-18248 Warn if monitored RPC task has been tied up beyond a configurable threshold
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a9021755 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a9021755 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a9021755 Branch: refs/heads/branch-2 Commit: a90217555346bb0768bfb410bb0d1fa02d6d562d Parents: f855b51 Author: Andrew Purtell <apurt...@apache.org> Authored: Wed Aug 9 18:11:28 2017 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Aug 9 18:16:57 2017 -0700 ---------------------------------------------------------------------- .../monitoring/MonitoredRPCHandlerImpl.java | 8 +- .../hadoop/hbase/monitoring/MonitoredTask.java | 2 + .../hbase/monitoring/MonitoredTaskImpl.java | 16 +++- .../hadoop/hbase/monitoring/TaskMonitor.java | 88 +++++++++++++++++--- .../hbase/monitoring/TestTaskMonitor.java | 44 +++++++--- 5 files changed, 130 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a9021755/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java index b49df28..3ebe3b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java @@ -251,6 +251,12 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl if (getState() != State.RUNNING) { return super.toString(); } - return super.toString() + ", rpcMethod=" + getRPC(); + return super.toString() + + ", queuetimems=" + getRPCQueueTime() + + ", starttimems=" + getRPCStartTime() + + ", clientaddress=" + clientAddress + + ", remoteport=" + remotePort + + ", packetlength=" + getRPCPacketLength() + + ", rpcMethod=" + getRPC(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a9021755/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java index ff3667b..48fba1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java @@ -39,6 +39,7 @@ public interface MonitoredTask extends Cloneable { State getState(); long getStateTime(); long getCompletionTimestamp(); + long getWarnTime(); void markComplete(String msg); void pause(String msg); @@ -48,6 +49,7 @@ public interface MonitoredTask extends Cloneable { void setStatus(String status); void setDescription(String description); + void setWarnTime(final long t); /** * Explicitly mark this status as able to be cleaned up, http://git-wip-us.apache.org/repos/asf/hbase/blob/a9021755/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java index dda77ac..754e3d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java @@ -30,7 +30,8 @@ class MonitoredTaskImpl implements MonitoredTask { private long startTime; private long statusTime; private long stateTime; - + private long warnTime; + private volatile String status; private volatile String description; @@ -42,6 +43,7 @@ class MonitoredTaskImpl implements MonitoredTask { startTime = System.currentTimeMillis(); statusTime = startTime; stateTime = startTime; + warnTime = startTime; } @Override @@ -82,7 +84,12 @@ class MonitoredTaskImpl implements MonitoredTask { public long getStateTime() { return stateTime; } - + + @Override + public long getWarnTime() { + return warnTime; + } + @Override public long getCompletionTimestamp() { if (state == State.COMPLETE || state == State.ABORTED) { @@ -132,6 +139,11 @@ class MonitoredTaskImpl implements MonitoredTask { } @Override + public void setWarnTime(long t) { + this.warnTime = t; + } + + @Override public void cleanup() { if (state == State.RUNNING) { setState(State.ABORTED); http://git-wip-us.apache.org/repos/asf/hbase/blob/a9021755/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index dc96179..780916f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -30,9 +30,12 @@ import java.util.List; import org.apache.commons.collections.buffer.CircularFifoBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; /** @@ -44,16 +47,35 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; public class TaskMonitor { private static final Log LOG = LogFactory.getLog(TaskMonitor.class); - // Don't keep around any tasks that have completed more than - // 60 seconds ago - private static final long EXPIRATION_TIME = 60*1000; + public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks"; + public static final int DEFAULT_MAX_TASKS = 1000; + public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time"; + public static final long DEFAULT_RPC_WARN_TIME = 0; + public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time"; + public static final long DEFAULT_EXPIRATION_TIME = 60*1000; + public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval"; + public static final long DEFAULT_MONITOR_INTERVAL = 10*1000; - @VisibleForTesting - static final int MAX_TASKS = 1000; - private static TaskMonitor instance; - private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS); - private List<TaskAndWeakRefPair> rpcTasks = Lists.newArrayList(); + + private final int maxTasks; + private final long rpcWarnTime; + private final long expirationTime; + private final CircularFifoBuffer tasks; + private final List<TaskAndWeakRefPair> rpcTasks; + private final long monitorInterval; + private Thread monitorThread; + + TaskMonitor(Configuration conf) { + maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); + expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); + rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); + tasks = new CircularFifoBuffer(maxTasks); + rpcTasks = Lists.newArrayList(); + monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); + monitorThread = new Thread(new MonitorRunnable()); + Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor"); + } /** * Get singleton instance. @@ -61,7 +83,7 @@ public class TaskMonitor { */ public static synchronized TaskMonitor get() { if (instance == null) { - instance = new TaskMonitor(); + instance = new TaskMonitor(HBaseConfiguration.create()); } return instance; } @@ -93,6 +115,22 @@ public class TaskMonitor { return proxy; } + private synchronized void warnStuckTasks() { + if (rpcWarnTime > 0) { + final long now = EnvironmentEdgeManager.currentTime(); + for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); + it.hasNext();) { + TaskAndWeakRefPair pair = it.next(); + MonitoredTask stat = pair.get(); + if ((stat.getState() == MonitoredTaskImpl.State.RUNNING) && + (now >= stat.getWarnTime() + rpcWarnTime)) { + LOG.warn("Task may be stuck: " + stat); + stat.setWarnTime(now); + } + } + } + } + private synchronized void purgeExpiredTasks() { for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); it.hasNext();) { @@ -139,12 +177,11 @@ public class TaskMonitor { private boolean canPurge(MonitoredTask stat) { long cts = stat.getCompletionTimestamp(); - return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME); + return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime); } - public void dumpAsText(PrintWriter out) { - long now = System.currentTimeMillis(); + long now = EnvironmentEdgeManager.currentTime(); List<MonitoredTask> tasks = getTasks(); for (MonitoredTask task : tasks) { @@ -164,6 +201,12 @@ public class TaskMonitor { } } + public synchronized void shutdown() { + if (this.monitorThread != null) { + monitorThread.interrupt(); + } + } + /** * This class encapsulates an object as well as a weak reference to a proxy * that passes through calls to that object. In art form: @@ -218,4 +261,23 @@ public class TaskMonitor { return method.invoke(delegatee, args); } } + + private class MonitorRunnable implements Runnable { + private boolean running = true; + + @Override + public void run() { + while (running) { + try { + Thread.sleep(monitorInterval); + if (tasks.isFull()) { + purgeExpiredTasks(); + } + warnStuckTasks(); + } catch (InterruptedException e) { + running = false; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a9021755/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index 5464d9f..718339a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -22,8 +22,10 @@ import static org.junit.Assert.*; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -32,7 +34,7 @@ public class TestTaskMonitor { @Test public void testTaskMonitorBasics() { - TaskMonitor tm = new TaskMonitor(); + TaskMonitor tm = new TaskMonitor(new Configuration()); assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); @@ -55,11 +57,13 @@ public class TestTaskMonitor { // If we mark its completion time back a few minutes, it should get gced task.expireNow(); assertEquals(0, tm.getTasks().size()); + + tm.shutdown(); } @Test public void testTasksGetAbortedOnLeak() throws InterruptedException { - final TaskMonitor tm = new TaskMonitor(); + final TaskMonitor tm = new TaskMonitor(new Configuration()); assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); @@ -86,42 +90,58 @@ public class TestTaskMonitor { // Now it should be aborted MonitoredTask taskFromTm = tm.getTasks().get(0); assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); + + tm.shutdown(); } @Test public void testTaskLimit() throws Exception { - TaskMonitor tm = new TaskMonitor(); - for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) { + TaskMonitor tm = new TaskMonitor(new Configuration()); + for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) { tm.createStatus("task " + i); } // Make sure it was limited correctly - assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size()); + assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size()); // Make sure we culled the earlier tasks, not later // (i.e. tasks 0 through 9 should have been deleted) assertEquals("task 10", tm.getTasks().get(0).getDescription()); + tm.shutdown(); } @Test public void testDoNotPurgeRPCTask() throws Exception { int RPCTaskNums = 10; + TaskMonitor tm = TaskMonitor.get(); for(int i = 0; i < RPCTaskNums; i++) { - TaskMonitor.get().createRPCStatus("PRCTask" + i); + tm.createRPCStatus("PRCTask" + i); } - for(int i = 0; i < TaskMonitor.MAX_TASKS; i++) { - TaskMonitor.get().createStatus("otherTask" + i); + for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) { + tm.createStatus("otherTask" + i); } int remainRPCTask = 0; - for(MonitoredTask task :TaskMonitor.get().getTasks()) { + for(MonitoredTask task: tm.getTasks()) { if(task instanceof MonitoredRPCHandler) { remainRPCTask++; } } assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask); - + tm.shutdown(); } - - + @Test + public void testWarnStuckTasks() throws Exception { + final int INTERVAL = 1000; + Configuration conf = new Configuration(); + conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, INTERVAL); + conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, INTERVAL); + final TaskMonitor tm = new TaskMonitor(conf); + MonitoredRPCHandler t = tm.createRPCStatus("test task"); + long then = EnvironmentEdgeManager.currentTime(); + t.setRPC("testMethod", new Object[0], then); + Thread.sleep(INTERVAL * 2); + assertTrue("We did not warn", t.getWarnTime() > then); + tm.shutdown(); + } }