HBASE-17064 Add TaskMonitor#getTasks() variant which accepts type selection
Signed-off-by: tedyu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/effd1093 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/effd1093 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/effd1093 Branch: refs/heads/HBASE-14070.HLC Commit: effd1093b559aeba2bf66a4cf81cd4a0013de184 Parents: d37266f Author: Reid Chan <[email protected]> Authored: Tue Aug 15 15:50:22 2017 +0800 Committer: tedyu <[email protected]> Committed: Tue Aug 15 09:45:19 2017 -0700 ---------------------------------------------------------------------- .../hbase/tmpl/common/TaskMonitorTmpl.jamon | 21 +---- .../hadoop/hbase/monitoring/TaskMonitor.java | 97 +++++++++++++++++--- .../hbase/monitoring/TestTaskMonitor.java | 48 ++++++++++ 3 files changed, 133 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon index b4a5fea..986bc3a 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon @@ -27,27 +27,8 @@ String filter = "general"; String format = "html"; </%args> <%java> -List<? extends MonitoredTask> tasks = taskMonitor.getTasks(); -Iterator<? extends MonitoredTask> iter = tasks.iterator(); // apply requested filter -while (iter.hasNext()) { - MonitoredTask t = iter.next(); - if (filter.equals("general")) { - if (t instanceof MonitoredRPCHandler) - iter.remove(); - } else if (filter.equals("handler")) { - if (!(t instanceof MonitoredRPCHandler)) - iter.remove(); - } else if (filter.equals("rpc")) { - if (!(t instanceof MonitoredRPCHandler) || - !((MonitoredRPCHandler) t).isRPCRunning()) - iter.remove(); - } else if (filter.equals("operation")) { - if (!(t instanceof MonitoredRPCHandler) || - !((MonitoredRPCHandler) t).isOperationRunning()) - iter.remove(); - } -} +List<? extends MonitoredTask> tasks = taskMonitor.getTasks(filter); long now = System.currentTimeMillis(); Collections.reverse(tasks); boolean first = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index 780916f..ad9bd02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -157,22 +157,52 @@ public class TaskMonitor { * MonitoredTasks handled by this TaskMonitor. * @return A complete list of MonitoredTasks. */ - public synchronized List<MonitoredTask> getTasks() { + public List<MonitoredTask> getTasks() { + return getTasks(null); + } + + /** + * Produces a list containing copies of the current state of all non-expired + * MonitoredTasks handled by this TaskMonitor. + * @param filter type of wanted tasks + * @return A filtered list of MonitoredTasks. + */ + public synchronized List<MonitoredTask> getTasks(String filter) { purgeExpiredTasks(); - ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); - for (Iterator<TaskAndWeakRefPair> it = tasks.iterator(); - it.hasNext();) { - TaskAndWeakRefPair pair = it.next(); - MonitoredTask t = pair.get(); - ret.add(t.clone()); + TaskFilter taskFilter = createTaskFilter(filter); + ArrayList<MonitoredTask> results = + Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); + processTasks(tasks, taskFilter, results); + processTasks(rpcTasks, taskFilter, results); + return results; + } + + /** + * Create a task filter according to a given filter type. + * @param filter type of monitored task + * @return a task filter + */ + private static TaskFilter createTaskFilter(String filter) { + switch (TaskFilter.TaskType.getTaskType(filter)) { + case GENERAL: return task -> task instanceof MonitoredRPCHandler; + case HANDLER: return task -> !(task instanceof MonitoredRPCHandler); + case RPC: return task -> !(task instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) task).isRPCRunning(); + case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) task).isOperationRunning(); + default: return task -> false; } - for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator(); - it.hasNext();) { - TaskAndWeakRefPair pair = it.next(); - MonitoredTask t = pair.get(); - ret.add(t.clone()); + } + + private static void processTasks(Iterable<TaskAndWeakRefPair> tasks, + TaskFilter filter, + List<MonitoredTask> results) { + for (TaskAndWeakRefPair task : tasks) { + MonitoredTask t = task.get(); + if (!filter.filter(t)) { + results.add(t.clone()); + } } - return ret; } private boolean canPurge(MonitoredTask stat) { @@ -280,4 +310,45 @@ public class TaskMonitor { } } } + + private interface TaskFilter { + enum TaskType { + GENERAL("general"), + HANDLER("handler"), + RPC("rpc"), + OPERATION("operation"), + ALL("all"); + + private String type; + + private TaskType(String type) { + this.type = type.toLowerCase(); + } + + static TaskType getTaskType(String type) { + if (type == null || type.isEmpty()) { + return ALL; + } + type = type.toLowerCase(); + for (TaskType taskType : values()) { + if (taskType.toString().equals(type)) { + return taskType; + } + } + return ALL; + } + + @Override + public String toString() { + return type; + } + } + + /** + * Filter out unwanted task. + * @param task monitored task + * @return false if a task is accepted, true if it is filtered + */ + boolean filter(MonitoredTask t); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index 718339a..7abcde8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -20,9 +20,15 @@ package org.apache.hadoop.hbase.monitoring; import static org.junit.Assert.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Query; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -143,5 +149,47 @@ public class TestTaskMonitor { tm.shutdown(); } + @Test + public void testGetTasksWithFilter() throws Exception { + TaskMonitor tm = new TaskMonitor(new Configuration()); + assertTrue("Task monitor should start empty", tm.getTasks().isEmpty()); + // Create 5 general tasks + tm.createStatus("General task1"); + tm.createStatus("General task2"); + tm.createStatus("General task3"); + tm.createStatus("General task4"); + tm.createStatus("General task5"); + // Create 5 rpc tasks, and mark 1 completed + int length = 5; + ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i); + rpcHandlers.add(rpcHandler); + } + // Create rpc opertions + byte[] row = new byte[] { 0x01 }; + Mutation m = new Put(row); + Query q = new Scan(); + String notOperation = "for test"; + rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000); + rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000); + rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000); + rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000); + rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000); + MonitoredRPCHandler completed = rpcHandlers.get(4); + completed.markComplete("Completed!"); + // Test get tasks with filter + List<MonitoredTask> generalTasks = tm.getTasks("general"); + assertEquals(5, generalTasks.size()); + List<MonitoredTask> handlerTasks = tm.getTasks("handler"); + assertEquals(5, handlerTasks.size()); + List<MonitoredTask> rpcTasks = tm.getTasks("rpc"); + // The last rpc handler is stopped + assertEquals(4, rpcTasks.size()); + List<MonitoredTask> operationTasks = tm.getTasks("operation"); + // Handler 3 doesn't handle Operation. + assertEquals(3, operationTasks.size()); + tm.shutdown(); + } }
