Repository: hbase
Updated Branches:
  refs/heads/0.98 ffa460ec6 -> b4b1b9c46


HBASE-12787 Backport HBASE-12028 (Abort the RegionServer when it's handler 
threads die) to 0.98 (Alicia Ying Shu)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b4b1b9c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b4b1b9c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b4b1b9c4

Branch: refs/heads/0.98
Commit: b4b1b9c46308747b14620d1010526562a3fc4ff5
Parents: ffa460e
Author: Andrew Purtell <apurt...@apache.org>
Authored: Fri Jan 9 17:31:42 2015 -0800
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Fri Jan 9 17:31:42 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     | 11 +++++
 .../src/main/resources/hbase-default.xml        |  8 ++++
 .../hbase/ipc/BalancedQueueRpcExecutor.java     | 15 +++++--
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |  3 ++
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    | 16 ++++++--
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    | 43 +++++++++++++++++---
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    | 25 +++++++-----
 .../regionserver/SimpleRpcSchedulerFactory.java |  1 +
 .../hbase/ipc/TestSimpleRpcScheduler.java       |  4 +-
 9 files changed, 101 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 323ec8b..0d7d680 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -850,6 +850,17 @@ public final class HConstants {
   public static final String REGION_SERVER_HANDLER_COUNT = 
"hbase.regionserver.handler.count";
   public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
 
+  /*
+   * REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
+   * -1  => Disable aborting
+   * 0   => Abort if even a single handler has died
+   * 0.x => Abort only when this percent of handlers have died
+   * 1   => Abort only all of the handers have died
+   */
+  public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
+      "hbase.regionserver.handler.abort.on.error.percent";
+  public static final double 
DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = -1;
+  
   public static final String REGION_SERVER_META_HANDLER_COUNT =
       "hbase.regionserver.metahandler.count";
   public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml 
b/hbase-common/src/main/resources/hbase-default.xml
index f285cf7..5628109 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1239,4 +1239,12 @@ possible configurations would overwhelm and obscure the 
important.
       with the user issuing the mutation
     </description>
   </property>
+  <property>
+    <name>hbase.regionserver.handler.abort.on.error.percent</name>
+    <value>-1</value>
+    <description>The percent of region server RPC threads failed to abort RS.
+    -1 Disable aborting; 0 Abort if even a single handler has died;
+    0.x Abort only when this percent of handlers have died;
+    1 Abort only all of the handers have died.</description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 2418cf7..2a6fd9c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -19,13 +19,14 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
@@ -41,12 +42,18 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
 
   public BalancedQueueRpcExecutor(final String name, final int handlerCount, 
final int numQueues,
       final int maxQueueLength) {
-    this(name, handlerCount, numQueues, LinkedBlockingQueue.class, 
maxQueueLength);
+    this(name, handlerCount, numQueues, maxQueueLength, null, null);
+  }
+
+  public BalancedQueueRpcExecutor(final String name, final int handlerCount, 
final int numQueues,
+      final int maxQueueLength, final Configuration conf, final Abortable 
abortable) {
+    this(name, handlerCount, numQueues, conf, abortable, 
LinkedBlockingQueue.class, maxQueueLength);
   }
 
   public BalancedQueueRpcExecutor(final String name, final int handlerCount, 
final int numQueues,
+      final Configuration conf, final Abortable abortable,
       final Class<? extends BlockingQueue> queueClass, Object... initargs) {
-    super(name, Math.max(handlerCount, numQueues));
+    super(name, Math.max(handlerCount, numQueues), conf, abortable);
     queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
     this.balancer = getBalancer(numQueues);
     initializeQueues(numQueues, queueClass, initargs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 18e2902..e2b003b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -111,6 +111,9 @@ public class CallRunner {
         RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + 
call.toShortString(), e);
         errorThrowable = e;
         error = StringUtils.stringifyException(e);
+        if (e instanceof Error) {
+          throw (Error)e;
+        } 
       } finally {
         if (traceScope != null) {
           traceScope.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index ddab8fa..9a3d8cd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -26,9 +26,11 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
@@ -56,24 +58,32 @@ public class RWQueueRpcExecutor extends RpcExecutor {
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final 
int numQueues,
       final float readShare, final int maxQueueLength) {
-    this(name, handlerCount, numQueues, readShare, maxQueueLength,
+    this(name, handlerCount, numQueues, readShare, maxQueueLength, null, null);
+  }
+
+  public RWQueueRpcExecutor(final String name, final int handlerCount, final 
int numQueues,
+      final float readShare, final int maxQueueLength, final Configuration 
conf, final Abortable abortable) {
+    this(name, handlerCount, numQueues, readShare, maxQueueLength, conf, 
abortable, 
       LinkedBlockingQueue.class);
   }
 
   public RWQueueRpcExecutor(final String name, final int handlerCount, final 
int numQueues,
       final float readShare, final int maxQueueLength,
+      final Configuration conf, final Abortable abortable,
       final Class<? extends BlockingQueue> readQueueClass, Object... 
readQueueInitArgs) {
     this(name, calcNumWriters(handlerCount, readShare), 
calcNumReaders(handlerCount, readShare),
       calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, 
readShare),
+      conf, abortable,
       LinkedBlockingQueue.class, new Object[] {maxQueueLength},
       readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, 
readQueueInitArgs));
   }
 
   public RWQueueRpcExecutor(final String name, final int writeHandlers, final 
int readHandlers,
       final int numWriteQueues, final int numReadQueues,
+      final Configuration conf, final Abortable abortable,
       final Class<? extends BlockingQueue> writeQueueClass, Object[] 
writeQueueInitArgs,
       final Class<? extends BlockingQueue> readQueueClass, Object[] 
readQueueInitArgs) {
-    super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + 
numReadQueues));
+    super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + 
numReadQueues), conf, abortable);
 
     this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
     this.readHandlersCount = Math.max(readHandlers, numReadQueues);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 233c26e..bb6fdf2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -41,15 +45,26 @@ public abstract class RpcExecutor {
   private final List<Thread> handlers;
   private final int handlerCount;
   private final String name;
+  private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
 
   private boolean running;
 
+  private Configuration conf = null;
+  private Abortable abortable = null;
+
   public RpcExecutor(final String name, final int handlerCount) {
     this.handlers = new ArrayList<Thread>(handlerCount);
     this.handlerCount = handlerCount;
     this.name = Strings.nullToEmpty(name);
   }
 
+  public RpcExecutor(final String name, final int handlerCount, final 
Configuration conf,
+      final Abortable abortable) {
+    this(name, handlerCount);
+    this.conf = conf;
+    this.abortable = abortable;
+  }
+  
   public void start(final int port) {
     running = true;
     startHandlers(port);
@@ -103,6 +118,9 @@ public abstract class RpcExecutor {
 
   protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
     boolean interrupted = false;
+    double handlerFailureThreshhold =
+        conf == null ? 1.0 : 
conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
+          HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
     try {
       while (running) {
         try {
@@ -111,11 +129,24 @@ public abstract class RpcExecutor {
             activeHandlerCount.incrementAndGet();
             task.run();
           } catch (Error e) {
-            LOG.error("RpcServer handler thread throws error: ", e);
-            throw e;
-          } catch (RuntimeException e) {
-            LOG.error("RpcServer handler thread throws exception: ", e);
-            throw e;
+            int failedCount = failedHandlerCount.incrementAndGet();
+            if (handlerFailureThreshhold >= 0
+                && failedCount > handlerCount * handlerFailureThreshhold) {
+              String message =
+                  "Number of failed RpcServer handler exceeded threshhold "
+                      + handlerFailureThreshhold + "  with failed reason: "
+                      + StringUtils.stringifyException(e);
+              if (abortable != null) {
+                abortable.abort(message, e);
+              } else {
+                LOG.error("Received " + StringUtils.stringifyException(e)
+                  + " but not aborting due to abortable being null");
+                throw e;
+              }
+            } else {
+              LOG.warn("RpcServer handler threads encountered errors "
+                  + StringUtils.stringifyException(e));
+            }
           } finally {
             activeHandlerCount.decrementAndGet();
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index a44ec89..5f6f11f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.ipc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * A scheduler that maintains isolated handler pools for general, 
high-priority and replication
@@ -49,6 +50,8 @@ public class SimpleRpcScheduler extends RpcScheduler {
 
   /** What level a high priority call is at. */
   private final int highPriorityLevel;
+  
+  private final Abortable abortable;
 
   /**
    * @param conf
@@ -64,12 +67,14 @@ public class SimpleRpcScheduler extends RpcScheduler {
       int priorityHandlerCount,
       int replicationHandlerCount,
       PriorityFunction priority,
+      Abortable abortable,
       int highPriorityLevel) {
     int maxQueueLength = conf.getInt(CALL_QUEUE_MAX_LENGTH_CONF_KEY,
       conf.getInt("ipc.server.max.callqueue.length",
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER));
     this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
+    this.abortable = abortable;
 
     float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY,
       conf.getFloat("ipc.server.callqueue.read.share", 0));
@@ -83,19 +88,19 @@ public class SimpleRpcScheduler extends RpcScheduler {
     if (numCallQueues > 1 && callqReadShare > 0) {
       // multiple read/write queues
       callExecutor = new RWQueueRpcExecutor("RW.Default", handlerCount, 
numCallQueues,
-          callqReadShare, maxQueueLength);
+        callqReadShare, maxQueueLength, conf, abortable);
     } else {
       // multiple queues
       callExecutor = new BalancedQueueRpcExecutor("B.Default", handlerCount,
-          numCallQueues, maxQueueLength);
+        numCallQueues, maxQueueLength, conf, abortable);
     }
 
-   this.priorityExecutor =
-     priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", 
priorityHandlerCount,
-       1, maxQueueLength) : null;
-   this.replicationExecutor =
-     replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
-       replicationHandlerCount, 1, maxQueueLength) : null;
+    this.priorityExecutor =
+        priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", 
priorityHandlerCount,
+          1, maxQueueLength, conf, abortable) : null;
+    this.replicationExecutor =
+       replicationHandlerCount > 0 ? new 
BalancedQueueRpcExecutor("Replication",
+         replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index b8daa52..a2f1cf8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -41,6 +41,7 @@ public class SimpleRpcSchedulerFactory implements 
RpcSchedulerFactory {
         conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
             HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
         server,
+        server,
         HConstants.QOS_THRESHOLD);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b4b1b9c4/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 5040e75..435f874 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -68,7 +68,7 @@ public class TestSimpleRpcScheduler {
   public void testBasic() throws IOException, InterruptedException {
     PriorityFunction qosFunction = mock(PriorityFunction.class);
     RpcScheduler scheduler = new SimpleRpcScheduler(
-        conf, 10, 0, 0, qosFunction, 0);
+      conf, 10, 0, 0, qosFunction, null, 0);
     scheduler.init(CONTEXT);
     scheduler.start();
     CallRunner task = createMockTask();
@@ -110,7 +110,7 @@ public class TestSimpleRpcScheduler {
     }
 
     RpcScheduler scheduler = new SimpleRpcScheduler(
-        conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
+      conf, 1, 1 ,1, qosFunction, null, HConstants.HIGH_QOS);
     scheduler.init(CONTEXT);
     scheduler.start();
     for (CallRunner task : tasks) {

Reply via email to