HBASE-18248 Warn if monitored RPC task has been tied up beyond a configurable 
threshold


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0941127
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0941127
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0941127

Branch: refs/heads/master
Commit: d0941127d424bc76688aec7952388f858d917b14
Parents: 794a3b1
Author: Andrew Purtell <apurt...@apache.org>
Authored: Wed Aug 9 18:11:28 2017 -0700
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Wed Aug 9 18:16:38 2017 -0700

----------------------------------------------------------------------
 .../monitoring/MonitoredRPCHandlerImpl.java     |  8 +-
 .../hadoop/hbase/monitoring/MonitoredTask.java  |  2 +
 .../hbase/monitoring/MonitoredTaskImpl.java     | 16 +++-
 .../hadoop/hbase/monitoring/TaskMonitor.java    | 88 +++++++++++++++++---
 .../hbase/monitoring/TestTaskMonitor.java       | 44 +++++++---
 5 files changed, 130 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
index b49df28..3ebe3b7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
@@ -251,6 +251,12 @@ public class MonitoredRPCHandlerImpl extends 
MonitoredTaskImpl
     if (getState() != State.RUNNING) {
       return super.toString();
     }
-    return super.toString() + ", rpcMethod=" + getRPC();
+    return super.toString()
+        + ", queuetimems=" + getRPCQueueTime()
+        + ", starttimems=" + getRPCStartTime()
+        + ", clientaddress=" + clientAddress
+        + ", remoteport=" + remotePort
+        + ", packetlength=" + getRPCPacketLength()
+        + ", rpcMethod=" + getRPC();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
index ff3667b..48fba1b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
@@ -39,6 +39,7 @@ public interface MonitoredTask extends Cloneable {
   State getState();
   long getStateTime();
   long getCompletionTimestamp();
+  long getWarnTime();
 
   void markComplete(String msg);
   void pause(String msg);
@@ -48,6 +49,7 @@ public interface MonitoredTask extends Cloneable {
 
   void setStatus(String status);
   void setDescription(String description);
+  void setWarnTime(final long t);
 
   /**
    * Explicitly mark this status as able to be cleaned up,

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
index dda77ac..754e3d6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
@@ -30,7 +30,8 @@ class MonitoredTaskImpl implements MonitoredTask {
   private long startTime;
   private long statusTime;
   private long stateTime;
-  
+  private long warnTime;
+
   private volatile String status;
   private volatile String description;
   
@@ -42,6 +43,7 @@ class MonitoredTaskImpl implements MonitoredTask {
     startTime = System.currentTimeMillis();
     statusTime = startTime;
     stateTime = startTime;
+    warnTime = startTime;
   }
 
   @Override
@@ -82,7 +84,12 @@ class MonitoredTaskImpl implements MonitoredTask {
   public long getStateTime() {
     return stateTime;
   }
-  
+
+  @Override
+  public long getWarnTime() {
+    return warnTime;
+  }
+
   @Override
   public long getCompletionTimestamp() {
     if (state == State.COMPLETE || state == State.ABORTED) {
@@ -132,6 +139,11 @@ class MonitoredTaskImpl implements MonitoredTask {
   }
 
   @Override
+  public void setWarnTime(long t) {
+    this.warnTime = t;
+  }
+
+  @Override
   public void cleanup() {
     if (state == State.RUNNING) {
       setState(State.ABORTED);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
index dc96179..780916f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
@@ -30,9 +30,12 @@ import java.util.List;
 import org.apache.commons.collections.buffer.CircularFifoBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
 
-import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 
 /**
@@ -44,16 +47,35 @@ import 
org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 public class TaskMonitor {
   private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
 
-  // Don't keep around any tasks that have completed more than
-  // 60 seconds ago
-  private static final long EXPIRATION_TIME = 60*1000;
+  public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
+  public static final int DEFAULT_MAX_TASKS = 1000;
+  public static final String RPC_WARN_TIME_KEY = 
"hbase.taskmonitor.rpc.warn.time";
+  public static final long DEFAULT_RPC_WARN_TIME = 0;
+  public static final String EXPIRATION_TIME_KEY = 
"hbase.taskmonitor.expiration.time";
+  public static final long DEFAULT_EXPIRATION_TIME = 60*1000;
+  public static final String MONITOR_INTERVAL_KEY = 
"hbase.taskmonitor.monitor.interval";
+  public static final long DEFAULT_MONITOR_INTERVAL = 10*1000;
 
-  @VisibleForTesting
-  static final int MAX_TASKS = 1000;
-  
   private static TaskMonitor instance;
-  private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS);
-  private List<TaskAndWeakRefPair> rpcTasks = Lists.newArrayList();
+
+  private final int maxTasks;
+  private final long rpcWarnTime;
+  private final long expirationTime;
+  private final CircularFifoBuffer tasks;
+  private final List<TaskAndWeakRefPair> rpcTasks;
+  private final long monitorInterval;
+  private Thread monitorThread;
+
+  TaskMonitor(Configuration conf) {
+    maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
+    expirationTime = conf.getLong(EXPIRATION_TIME_KEY, 
DEFAULT_EXPIRATION_TIME);
+    rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
+    tasks = new CircularFifoBuffer(maxTasks);
+    rpcTasks = Lists.newArrayList();
+    monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, 
DEFAULT_MONITOR_INTERVAL);
+    monitorThread = new Thread(new MonitorRunnable());
+    Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for 
TaskMonitor");
+  }
 
   /**
    * Get singleton instance.
@@ -61,7 +83,7 @@ public class TaskMonitor {
    */
   public static synchronized TaskMonitor get() {
     if (instance == null) {
-      instance = new TaskMonitor();
+      instance = new TaskMonitor(HBaseConfiguration.create());
     }
     return instance;
   }
@@ -93,6 +115,22 @@ public class TaskMonitor {
     return proxy;
   }
 
+  private synchronized void warnStuckTasks() {
+    if (rpcWarnTime > 0) {
+      final long now = EnvironmentEdgeManager.currentTime();
+      for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
+          it.hasNext();) {
+        TaskAndWeakRefPair pair = it.next();
+        MonitoredTask stat = pair.get();
+        if ((stat.getState() == MonitoredTaskImpl.State.RUNNING) &&
+            (now >= stat.getWarnTime() + rpcWarnTime)) {
+          LOG.warn("Task may be stuck: " + stat);
+          stat.setWarnTime(now);
+        }
+      }
+    }
+  }
+
   private synchronized void purgeExpiredTasks() {
     for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
          it.hasNext();) {
@@ -139,12 +177,11 @@ public class TaskMonitor {
 
   private boolean canPurge(MonitoredTask stat) {
     long cts = stat.getCompletionTimestamp();
-    return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
+    return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > 
expirationTime);
   }
-  
 
   public void dumpAsText(PrintWriter out) {
-    long now = System.currentTimeMillis();
+    long now = EnvironmentEdgeManager.currentTime();
     
     List<MonitoredTask> tasks = getTasks();
     for (MonitoredTask task : tasks) {
@@ -164,6 +201,12 @@ public class TaskMonitor {
     }
   }
 
+  public synchronized void shutdown() {
+    if (this.monitorThread != null) {
+      monitorThread.interrupt();
+    }
+  }
+
   /**
    * This class encapsulates an object as well as a weak reference to a proxy
    * that passes through calls to that object. In art form:
@@ -218,4 +261,23 @@ public class TaskMonitor {
       return method.invoke(delegatee, args);
     }    
   }
+
+  private class MonitorRunnable implements Runnable {
+    private boolean running = true;
+
+    @Override
+    public void run() {
+      while (running) {
+        try {
+          Thread.sleep(monitorInterval);
+          if (tasks.isFull()) {
+            purgeExpiredTasks();
+          }
+          warnStuckTasks();
+        } catch (InterruptedException e) {
+          running = false;
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
index 5464d9f..718339a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
@@ -22,8 +22,10 @@ import static org.junit.Assert.*;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -32,7 +34,7 @@ public class TestTaskMonitor {
 
   @Test
   public void testTaskMonitorBasics() {
-    TaskMonitor tm = new TaskMonitor();
+    TaskMonitor tm = new TaskMonitor(new Configuration());
     assertTrue("Task monitor should start empty",
         tm.getTasks().isEmpty());
     
@@ -55,11 +57,13 @@ public class TestTaskMonitor {
     // If we mark its completion time back a few minutes, it should get gced
     task.expireNow();
     assertEquals(0, tm.getTasks().size());
+
+    tm.shutdown();
   }
   
   @Test
   public void testTasksGetAbortedOnLeak() throws InterruptedException {
-    final TaskMonitor tm = new TaskMonitor();
+    final TaskMonitor tm = new TaskMonitor(new Configuration());
     assertTrue("Task monitor should start empty",
         tm.getTasks().isEmpty());
     
@@ -86,42 +90,58 @@ public class TestTaskMonitor {
     // Now it should be aborted 
     MonitoredTask taskFromTm = tm.getTasks().get(0);
     assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
+
+    tm.shutdown();
   }
   
   @Test
   public void testTaskLimit() throws Exception {
-    TaskMonitor tm = new TaskMonitor();
-    for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) {
+    TaskMonitor tm = new TaskMonitor(new Configuration());
+    for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) {
       tm.createStatus("task " + i);
     }
     // Make sure it was limited correctly
-    assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size());
+    assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size());
     // Make sure we culled the earlier tasks, not later
     // (i.e. tasks 0 through 9 should have been deleted)
     assertEquals("task 10", tm.getTasks().get(0).getDescription());
+    tm.shutdown();
   }
 
   @Test
   public void testDoNotPurgeRPCTask() throws Exception {
     int RPCTaskNums = 10;
+    TaskMonitor tm = TaskMonitor.get();
     for(int i = 0; i < RPCTaskNums; i++) {
-      TaskMonitor.get().createRPCStatus("PRCTask" + i);
+      tm.createRPCStatus("PRCTask" + i);
     }
-    for(int i = 0; i < TaskMonitor.MAX_TASKS; i++) {
-      TaskMonitor.get().createStatus("otherTask" + i);
+    for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) {
+      tm.createStatus("otherTask" + i);
     }
     int remainRPCTask = 0;
-    for(MonitoredTask task :TaskMonitor.get().getTasks()) {
+    for(MonitoredTask task: tm.getTasks()) {
       if(task instanceof MonitoredRPCHandler) {
         remainRPCTask++;
       }
     }
     assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask);
-
+    tm.shutdown();
   }
 
-
-
+  @Test
+  public void testWarnStuckTasks() throws Exception {
+    final int INTERVAL = 1000;
+    Configuration conf = new Configuration();
+    conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, INTERVAL);
+    conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, INTERVAL);
+    final TaskMonitor tm = new TaskMonitor(conf);
+    MonitoredRPCHandler t = tm.createRPCStatus("test task");
+    long then = EnvironmentEdgeManager.currentTime();
+    t.setRPC("testMethod", new Object[0], then);
+    Thread.sleep(INTERVAL * 2);
+    assertTrue("We did not warn", t.getWarnTime() > then);
+    tm.shutdown();
+  }
 
 }
 

Reply via email to