HBASE-12529 Use ThreadLocalRandom for RandomQueueBalancer

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1591fa9f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1591fa9f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1591fa9f

Branch: refs/heads/0.98
Commit: 1591fa9f14713f54f9912ba171d7bb30e5c7aabe
Parents: 2db2790
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Wed Nov 19 17:29:45 2014 +0000
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Wed Nov 19 17:29:45 2014 +0000

----------------------------------------------------------------------
 .../hbase/ipc/BalancedQueueRpcExecutor.java     | 48 +-----------------
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    | 10 ++--
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    | 52 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1591fa9f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 40ba9fe..2418cf7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -28,8 +28,6 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
-import com.google.common.base.Preconditions;
-
 /**
  * An {@link RpcExecutor} that will balance requests evenly across all its 
queues, but still remains
  * efficient with a single queue via an inlinable queue balancing mechanism.
@@ -39,7 +37,7 @@ import com.google.common.base.Preconditions;
 public class BalancedQueueRpcExecutor extends RpcExecutor {
 
   protected final List<BlockingQueue<CallRunner>> queues;
-  private QueueBalancer balancer;
+  private final QueueBalancer balancer;
 
   public BalancedQueueRpcExecutor(final String name, final int handlerCount, 
final int numQueues,
       final int maxQueueLength) {
@@ -80,48 +78,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
   public List<BlockingQueue<CallRunner>> getQueues() {
     return queues;
   }
-
-  private static abstract class QueueBalancer {
-    /**
-     * @return the index of the next queue to which a request should be 
inserted
-     */
-    public abstract int getNextQueue();
-  }
-
-  public static QueueBalancer getBalancer(int queueSize) {
-    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at 
least 1");
-    if (queueSize == 1) {
-      return ONE_QUEUE;
-    } else {
-      return new RandomQueueBalancer(queueSize);
-    }
-  }
-
-  /**
-   * All requests go to the first queue, at index 0
-   */
-  private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
-
-    @Override
-    public int getNextQueue() {
-      return 0;
-    }
-  };
-
-  /**
-   * Queue balancer that just randomly selects a queue in the range [0, num 
queues).
-   */
-  private static class RandomQueueBalancer extends QueueBalancer {
-    private int queueSize;
-    private Random random;
-
-    public RandomQueueBalancer(int queueSize) {
-      this.queueSize = queueSize;
-      this.random = new Random();
-    }
-
-    public int getNextQueue() {
-      return random.nextInt(queueSize);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1591fa9f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 6df2bf2..ddab8fa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -48,7 +47,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
 
   private final List<BlockingQueue<CallRunner>> queues;
-  private final Random balancer = new Random();
+  private final QueueBalancer writeBalancer;
+  private final QueueBalancer readBalancer;
   private final int writeHandlersCount;
   private final int readHandlersCount;
   private final int numWriteQueues;
@@ -79,6 +79,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     this.readHandlersCount = Math.max(readHandlers, numReadQueues);
     this.numWriteQueues = numWriteQueues;
     this.numReadQueues = numReadQueues;
+    this.writeBalancer = getBalancer(numWriteQueues);
+    this.readBalancer = getBalancer(numReadQueues);
 
     queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + 
readHandlersCount);
     LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + 
writeHandlersCount +
@@ -106,9 +108,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     RpcServer.Call call = callTask.getCall();
     int queueIndex;
     if (isWriteRequest(call.getHeader(), call.param)) {
-      queueIndex = balancer.nextInt(numWriteQueues);
+      queueIndex = writeBalancer.getNextQueue();
     } else {
-      queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
+      queueIndex = numWriteQueues + readBalancer.getNextQueue();
     }
     queues.get(queueIndex).put(callTask);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1591fa9f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 68dc063..233c26e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -22,12 +22,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
 @InterfaceAudience.Private
@@ -127,4 +129,54 @@ public abstract class RpcExecutor {
       }
     }
   }
+
+  public static abstract class QueueBalancer {
+    /**
+     * @return the index of the next queue to which a request should be 
inserted
+     */
+    public abstract int getNextQueue();
+  }
+
+  public static QueueBalancer getBalancer(int queueSize) {
+    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at 
least 1");
+    if (queueSize == 1) {
+      return ONE_QUEUE;
+    } else {
+      return new RandomQueueBalancer(queueSize);
+    }
+  }
+
+  /**
+   * All requests go to the first queue, at index 0
+   */
+  private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
+
+    @Override
+    public int getNextQueue() {
+      return 0;
+    }
+  };
+
+  /**
+   * Queue balancer that just randomly selects a queue in the range [0, num 
queues).
+   */
+  private static class RandomQueueBalancer extends QueueBalancer {
+    private final int queueSize;
+
+    private final ThreadLocal<Random> threadRandom =
+      new ThreadLocal<Random>() {
+        @Override
+        protected Random initialValue() {
+          return new Random();
+        }
+      };
+
+    public RandomQueueBalancer(int queueSize) {
+      this.queueSize = queueSize;
+    }
+
+    public int getNextQueue() {
+      return threadRandom.get().nextInt(queueSize);
+    }
+  }
 }

Reply via email to