This is an automated email from the ASF dual-hosted git repository. binlijin pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new fcfe9ab HBASE-23615 Use a dedicated thread for executing WorkerMonitor in Pro… (#961) fcfe9ab is described below commit fcfe9ab7d842e4f34665c1a1dc9f1310fb65bf53 Author: binlijin <binli...@gmail.com> AuthorDate: Tue Dec 31 10:03:01 2019 +0800 HBASE-23615 Use a dedicated thread for executing WorkerMonitor in Pro… (#961) Signed-off-by: stack <st...@apache.org> Signed-off-by: Duo Zhang <zhang...@apache.org> Signed-off-by: virajjasani <34790606+virajjas...@users.noreply.github.com> --- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 18 ++++++++++++++++-- .../hadoop/hbase/procedure2/TimeoutExecutorThread.java | 5 +++-- .../hbase/master/procedure/TestProcedureAdmin.java | 1 - 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 93c255f..70dc6f1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -221,6 +221,15 @@ public class ProcedureExecutor<TEnvironment> { */ private TimeoutExecutorThread<TEnvironment> timeoutExecutor; + /** + * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if + * there is no worker to assign meta, it will new worker thread for it, so it is very important. + * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore + * and so on, some tasks may execute for a long time so will block other tasks like + * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor. + */ + private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor; + private int corePoolSize; private int maxPoolSize; private int urgentPoolSize; @@ -590,7 +599,8 @@ public class ProcedureExecutor<TEnvironment> { corePoolSize, maxPoolSize, urgentPoolSize); this.threadGroup = new ThreadGroup("PEWorkerGroup"); - this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup); + this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); + this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); // Create the workers workerId.set(0); @@ -640,6 +650,7 @@ public class ProcedureExecutor<TEnvironment> { LOG.debug("Start workers {}, urgent workers {}", workerThreads.size(), urgentWorkerThreads.size()); timeoutExecutor.start(); + workerMonitorExecutor.start(); for (WorkerThread worker: workerThreads) { worker.start(); } @@ -649,7 +660,7 @@ public class ProcedureExecutor<TEnvironment> { } // Internal chores - timeoutExecutor.add(new WorkerMonitor()); + workerMonitorExecutor.add(new WorkerMonitor()); if (upgradeTo2_2) { timeoutExecutor.add(new InlineChore() { @@ -683,6 +694,7 @@ public class ProcedureExecutor<TEnvironment> { LOG.info("Stopping"); scheduler.stop(); timeoutExecutor.sendStopSignal(); + workerMonitorExecutor.sendStopSignal(); } @VisibleForTesting @@ -691,6 +703,8 @@ public class ProcedureExecutor<TEnvironment> { // stop the timeout executor timeoutExecutor.awaitTermination(); + // stop the work monitor executor + workerMonitorExecutor.awaitTermination(); // stop the worker threads for (WorkerThread worker: workerThreads) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 30a52d4..1e796d9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -37,8 +37,9 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread { private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); - public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) { - super(group, "ProcExecTimeout"); + public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group, + String name) { + super(group, name); setDaemon(true); this.executor = executor; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java index 29645c2..3e7a241 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java @@ -57,7 +57,6 @@ public class TestProcedureAdmin { protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static void setupConf(Configuration conf) { conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);