HBASE-12529 Use ThreadLocalRandom for RandomQueueBalancer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1591fa9f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1591fa9f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1591fa9f Branch: refs/heads/0.98 Commit: 1591fa9f14713f54f9912ba171d7bb30e5c7aabe Parents: 2db2790 Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Wed Nov 19 17:29:45 2014 +0000 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Wed Nov 19 17:29:45 2014 +0000 ---------------------------------------------------------------------- .../hbase/ipc/BalancedQueueRpcExecutor.java | 48 +----------------- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 10 ++-- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 52 ++++++++++++++++++++ 3 files changed, 59 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1591fa9f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 40ba9fe..2418cf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; -import com.google.common.base.Preconditions; - /** * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains * efficient with a single queue via an inlinable queue balancing mechanism. @@ -39,7 +37,7 @@ import com.google.common.base.Preconditions; public class BalancedQueueRpcExecutor extends RpcExecutor { protected final List<BlockingQueue<CallRunner>> queues; - private QueueBalancer balancer; + private final QueueBalancer balancer; public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final int maxQueueLength) { @@ -80,48 +78,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public List<BlockingQueue<CallRunner>> getQueues() { return queues; } - - private static abstract class QueueBalancer { - /** - * @return the index of the next queue to which a request should be inserted - */ - public abstract int getNextQueue(); - } - - public static QueueBalancer getBalancer(int queueSize) { - Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); - if (queueSize == 1) { - return ONE_QUEUE; - } else { - return new RandomQueueBalancer(queueSize); - } - } - - /** - * All requests go to the first queue, at index 0 - */ - private static QueueBalancer ONE_QUEUE = new QueueBalancer() { - - @Override - public int getNextQueue() { - return 0; - } - }; - - /** - * Queue balancer that just randomly selects a queue in the range [0, num queues). - */ - private static class RandomQueueBalancer extends QueueBalancer { - private int queueSize; - private Random random; - - public RandomQueueBalancer(int queueSize) { - this.queueSize = queueSize; - this.random = new Random(); - } - - public int getNextQueue() { - return random.nextInt(queueSize); - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1591fa9f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 6df2bf2..ddab8fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; import java.util.List; -import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -48,7 +47,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class); private final List<BlockingQueue<CallRunner>> queues; - private final Random balancer = new Random(); + private final QueueBalancer writeBalancer; + private final QueueBalancer readBalancer; private final int writeHandlersCount; private final int readHandlersCount; private final int numWriteQueues; @@ -79,6 +79,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { this.readHandlersCount = Math.max(readHandlers, numReadQueues); this.numWriteQueues = numWriteQueues; this.numReadQueues = numReadQueues; + this.writeBalancer = getBalancer(numWriteQueues); + this.readBalancer = getBalancer(numReadQueues); queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount); LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + @@ -106,9 +108,9 @@ public class RWQueueRpcExecutor extends RpcExecutor { RpcServer.Call call = callTask.getCall(); int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { - queueIndex = balancer.nextInt(numWriteQueues); + queueIndex = writeBalancer.getNextQueue(); } else { - queueIndex = numWriteQueues + balancer.nextInt(numReadQueues); + queueIndex = numWriteQueues + readBalancer.getNextQueue(); } queues.get(queueIndex).put(callTask); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1591fa9f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 68dc063..233c26e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -22,12 +22,14 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; @InterfaceAudience.Private @@ -127,4 +129,54 @@ public abstract class RpcExecutor { } } } + + public static abstract class QueueBalancer { + /** + * @return the index of the next queue to which a request should be inserted + */ + public abstract int getNextQueue(); + } + + public static QueueBalancer getBalancer(int queueSize) { + Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); + if (queueSize == 1) { + return ONE_QUEUE; + } else { + return new RandomQueueBalancer(queueSize); + } + } + + /** + * All requests go to the first queue, at index 0 + */ + private static QueueBalancer ONE_QUEUE = new QueueBalancer() { + + @Override + public int getNextQueue() { + return 0; + } + }; + + /** + * Queue balancer that just randomly selects a queue in the range [0, num queues). + */ + private static class RandomQueueBalancer extends QueueBalancer { + private final int queueSize; + + private final ThreadLocal<Random> threadRandom = + new ThreadLocal<Random>() { + @Override + protected Random initialValue() { + return new Random(); + } + }; + + public RandomQueueBalancer(int queueSize) { + this.queueSize = queueSize; + } + + public int getNextQueue() { + return threadRandom.get().nextInt(queueSize); + } + } }