Repository: hbase Updated Branches: refs/heads/0.98 ffa460ec6 -> b4b1b9c46
HBASE-12787 Backport HBASE-12028 (Abort the RegionServer when it's handler threads die) to 0.98 (Alicia Ying Shu) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b4b1b9c4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b4b1b9c4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b4b1b9c4 Branch: refs/heads/0.98 Commit: b4b1b9c46308747b14620d1010526562a3fc4ff5 Parents: ffa460e Author: Andrew Purtell <apurt...@apache.org> Authored: Fri Jan 9 17:31:42 2015 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Jan 9 17:31:42 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HConstants.java | 11 +++++ .../src/main/resources/hbase-default.xml | 8 ++++ .../hbase/ipc/BalancedQueueRpcExecutor.java | 15 +++++-- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 3 ++ .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 16 ++++++-- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 43 +++++++++++++++++--- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 25 +++++++----- .../regionserver/SimpleRpcSchedulerFactory.java | 1 + .../hbase/ipc/TestSimpleRpcScheduler.java | 4 +- 9 files changed, 101 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 323ec8b..0d7d680 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -850,6 +850,17 @@ public final class HConstants { public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count"; public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30; + /* + * REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT: + * -1 => Disable aborting + * 0 => Abort if even a single handler has died + * 0.x => Abort only when this percent of handlers have died + * 1 => Abort only all of the handers have died + */ + public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = + "hbase.regionserver.handler.abort.on.error.percent"; + public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = -1; + public static final String REGION_SERVER_META_HANDLER_COUNT = "hbase.regionserver.metahandler.count"; public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10; http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index f285cf7..5628109 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1239,4 +1239,12 @@ possible configurations would overwhelm and obscure the important. with the user issuing the mutation </description> </property> + <property> + <name>hbase.regionserver.handler.abort.on.error.percent</name> + <value>-1</value> + <description>The percent of region server RPC threads failed to abort RS. + -1 Disable aborting; 0 Abort if even a single handler has died; + 0.x Abort only when this percent of handlers have died; + 1 Abort only all of the handers have died.</description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 2418cf7..2a6fd9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -19,13 +19,14 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; import java.util.List; -import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -41,12 +42,18 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final int maxQueueLength) { - this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength); + this(name, handlerCount, numQueues, maxQueueLength, null, null); + } + + public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final int maxQueueLength, final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength); } public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final Configuration conf, final Abortable abortable, final Class<? extends BlockingQueue> queueClass, Object... initargs) { - super(name, Math.max(handlerCount, numQueues)); + super(name, Math.max(handlerCount, numQueues), conf, abortable); queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues); this.balancer = getBalancer(numQueues); initializeQueues(numQueues, queueClass, initargs); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 18e2902..e2b003b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -111,6 +111,9 @@ public class CallRunner { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; error = StringUtils.stringifyException(e); + if (e instanceof Error) { + throw (Error)e; + } } finally { if (traceScope != null) { traceScope.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index ddab8fa..9a3d8cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -26,9 +26,11 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; @@ -56,24 +58,32 @@ public class RWQueueRpcExecutor extends RpcExecutor { public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength) { - this(name, handlerCount, numQueues, readShare, maxQueueLength, + this(name, handlerCount, numQueues, readShare, maxQueueLength, null, null); + } + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final int maxQueueLength, final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, readShare, maxQueueLength, conf, abortable, LinkedBlockingQueue.class); } public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable, final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) { this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare), calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), + conf, abortable, LinkedBlockingQueue.class, new Object[] {maxQueueLength}, readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs)); } public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers, final int numWriteQueues, final int numReadQueues, + final Configuration conf, final Abortable abortable, final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs, final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) { - super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues)); + super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues), conf, abortable); this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues); this.readHandlersCount = Math.max(readHandlers, numReadQueues); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 233c26e..bb6fdf2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -41,15 +45,26 @@ public abstract class RpcExecutor { private final List<Thread> handlers; private final int handlerCount; private final String name; + private final AtomicInteger failedHandlerCount = new AtomicInteger(0); private boolean running; + private Configuration conf = null; + private Abortable abortable = null; + public RpcExecutor(final String name, final int handlerCount) { this.handlers = new ArrayList<Thread>(handlerCount); this.handlerCount = handlerCount; this.name = Strings.nullToEmpty(name); } + public RpcExecutor(final String name, final int handlerCount, final Configuration conf, + final Abortable abortable) { + this(name, handlerCount); + this.conf = conf; + this.abortable = abortable; + } + public void start(final int port) { running = true; startHandlers(port); @@ -103,6 +118,9 @@ public abstract class RpcExecutor { protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) { boolean interrupted = false; + double handlerFailureThreshhold = + conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); try { while (running) { try { @@ -111,11 +129,24 @@ public abstract class RpcExecutor { activeHandlerCount.incrementAndGet(); task.run(); } catch (Error e) { - LOG.error("RpcServer handler thread throws error: ", e); - throw e; - } catch (RuntimeException e) { - LOG.error("RpcServer handler thread throws exception: ", e); - throw e; + int failedCount = failedHandlerCount.incrementAndGet(); + if (handlerFailureThreshhold >= 0 + && failedCount > handlerCount * handlerFailureThreshhold) { + String message = + "Number of failed RpcServer handler exceeded threshhold " + + handlerFailureThreshhold + " with failed reason: " + + StringUtils.stringifyException(e); + if (abortable != null) { + abortable.abort(message, e); + } else { + LOG.error("Received " + StringUtils.stringifyException(e) + + " but not aborting due to abortable being null"); + throw e; + } + } else { + LOG.warn("RpcServer handler threads encountered errors " + + StringUtils.stringifyException(e)); + } } finally { activeHandlerCount.decrementAndGet(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index a44ec89..5f6f11f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.ipc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; /** * A scheduler that maintains isolated handler pools for general, high-priority and replication @@ -49,6 +50,8 @@ public class SimpleRpcScheduler extends RpcScheduler { /** What level a high priority call is at. */ private final int highPriorityLevel; + + private final Abortable abortable; /** * @param conf @@ -64,12 +67,14 @@ public class SimpleRpcScheduler extends RpcScheduler { int priorityHandlerCount, int replicationHandlerCount, PriorityFunction priority, + Abortable abortable, int highPriorityLevel) { int maxQueueLength = conf.getInt(CALL_QUEUE_MAX_LENGTH_CONF_KEY, conf.getInt("ipc.server.max.callqueue.length", handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER)); this.priority = priority; this.highPriorityLevel = highPriorityLevel; + this.abortable = abortable; float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, conf.getFloat("ipc.server.callqueue.read.share", 0)); @@ -83,19 +88,19 @@ public class SimpleRpcScheduler extends RpcScheduler { if (numCallQueues > 1 && callqReadShare > 0) { // multiple read/write queues callExecutor = new RWQueueRpcExecutor("RW.Default", handlerCount, numCallQueues, - callqReadShare, maxQueueLength); + callqReadShare, maxQueueLength, conf, abortable); } else { // multiple queues callExecutor = new BalancedQueueRpcExecutor("B.Default", handlerCount, - numCallQueues, maxQueueLength); + numCallQueues, maxQueueLength, conf, abortable); } - this.priorityExecutor = - priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, - 1, maxQueueLength) : null; - this.replicationExecutor = - replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", - replicationHandlerCount, 1, maxQueueLength) : null; + this.priorityExecutor = + priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, + 1, maxQueueLength, conf, abortable) : null; + this.replicationExecutor = + replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", + replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java index b8daa52..a2f1cf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java @@ -41,6 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory { conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), server, + server, HConstants.QOS_THRESHOLD); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 5040e75..435f874 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -68,7 +68,7 @@ public class TestSimpleRpcScheduler { public void testBasic() throws IOException, InterruptedException { PriorityFunction qosFunction = mock(PriorityFunction.class); RpcScheduler scheduler = new SimpleRpcScheduler( - conf, 10, 0, 0, qosFunction, 0); + conf, 10, 0, 0, qosFunction, null, 0); scheduler.init(CONTEXT); scheduler.start(); CallRunner task = createMockTask(); @@ -110,7 +110,7 @@ public class TestSimpleRpcScheduler { } RpcScheduler scheduler = new SimpleRpcScheduler( - conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS); + conf, 1, 1 ,1, qosFunction, null, HConstants.HIGH_QOS); scheduler.init(CONTEXT); scheduler.start(); for (CallRunner task : tasks) {