Author: rawson
Date: Wed Sep 15 23:44:08 2010
New Revision: 997541

URL: http://svn.apache.org/viewvc?rev=997541&view=rev
Log:
HBASE-2782  QOS for META table access


Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=997541&r1=997540&r2=997541&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Sep 15 23:44:08 2010
@@ -910,6 +910,7 @@ Release 0.21.0 - Unreleased
    HBASE-2980  Refactor region server command line to a new class
    HBASE-2988  Support alternate compression for major compactions
    HBASE-2941  port HADOOP-6713 - threading scalability for RPC reads - to 
HBase
+   HBASE-2782  QOS for META table access
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=997541&r1=997540&r2=997541&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java 
Wed Sep 15 23:44:08 2010
@@ -30,6 +30,7 @@ import java.io.DataInput;
 import java.util.List;
 import java.util.Map;
 import java.util.ArrayList;
+import java.util.Set;
 import java.util.TreeMap;
 
 /**
@@ -47,7 +48,7 @@ public final class MultiAction implement
 
   /**
    * Get the total number of Actions
-   * 
+   *
    * @return total number of Actions for all groups in this container.
    */
   public int size() {
@@ -62,7 +63,7 @@ public final class MultiAction implement
    * Add an Action to this container based on it's regionName. If the 
regionName
    * is wrong, the initial execution will fail, but will be automatically
    * retried after looking up the correct region.
-   * 
+   *
    * @param regionName
    * @param a
    */
@@ -75,6 +76,10 @@ public final class MultiAction implement
     rsActions.add(a);
   }
 
+  public Set<byte[]> getRegions() {
+    return actions.keySet();
+  }
+
   /**
    * @return All actions from all regions in this container
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=997541&r1=997540&r2=997541&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Wed Sep 
15 23:44:08 2010
@@ -20,12 +20,14 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import com.google.common.base.Function;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.net.NetUtils;
@@ -82,8 +84,9 @@ public class HBaseRPC {
     super();
   }                                  // no public ctor
 
+
   /** A method invocation, including the method name and its parameters.*/
-  private static class Invocation implements Writable, Configurable {
+  public static class Invocation implements Writable, Configurable {
     private String methodName;
     @SuppressWarnings("unchecked")
     private Class[] parameterClasses;
@@ -497,9 +500,9 @@ public class HBaseRPC {
                                  final Class<?>[] ifaces,
                                  final String bindAddress, final int port,
                                  final int numHandlers,
-                                 final boolean verbose, Configuration conf)
+                                 int metaHandlerCount, final boolean verbose, 
Configuration conf, int highPriorityLevel)
     throws IOException {
-    return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, 
verbose);
+    return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, 
metaHandlerCount, verbose, highPriorityLevel);
   }
 
   /** An RPC Server. */
@@ -527,9 +530,9 @@ public class HBaseRPC {
      * @throws IOException e
      */
     public Server(Object instance, final Class<?>[] ifaces,
-                  Configuration conf, String bindAddress,  int port,
-                  int numHandlers, boolean verbose) throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, conf, 
classNameBase(instance.getClass().getName()));
+                  Configuration conf, String bindAddress, int port,
+                  int numHandlers, int metaHandlerCount, boolean verbose, int 
highPriorityLevel) throws IOException {
+      super(bindAddress, port, Invocation.class, numHandlers, 
metaHandlerCount, conf, classNameBase(instance.getClass().getName()), 
highPriorityLevel);
       this.instance = instance;
       this.implementation = instance.getClass();
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=997541&r1=997540&r2=997541&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed 
Sep 15 23:44:08 2010
@@ -20,9 +20,11 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import com.google.common.base.Function;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -131,6 +133,7 @@ public abstract class HBaseServer {
   protected String bindAddress;
   protected int port;                             // port we listen on
   private int handlerCount;                       // number of handler threads
+  private int priorityHandlerCount;
   private int readThreads;                        // number of read threads
   protected Class<? extends Writable> paramClass; // class of call parameters
   protected int maxIdleTime;                      // the maximum idle time 
after
@@ -156,6 +159,9 @@ public abstract class HBaseServer {
 
   volatile protected boolean running = true;         // true while server runs
   protected BlockingQueue<Call> callQueue; // queued calls
+  protected BlockingQueue<Call> priorityCallQueue;
+
+  private int highPriorityLevel;  // what level a high priority call is at
 
   protected final List<Connection> connectionList =
     Collections.synchronizedList(new LinkedList<Connection>());
@@ -165,6 +171,7 @@ public abstract class HBaseServer {
   protected Responder responder = null;
   protected int numConnections = 0;
   private Handler[] handlers = null;
+  private Handler[] priorityHandlers = null;
   protected HBaseRPCErrorHandler errorHandler = null;
 
   /**
@@ -959,7 +966,12 @@ public abstract class HBaseServer {
       param.readFields(dis);
 
       Call call = new Call(id, param, this);
-      callQueue.put(call);              // queue the call; maybe blocked here
+
+      if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) 
{
+        priorityCallQueue.put(call);
+      } else {
+        callQueue.put(call);              // queue the call; maybe blocked here
+      }
     }
 
     protected synchronized void close() {
@@ -977,9 +989,17 @@ public abstract class HBaseServer {
 
   /** Handles queued calls . */
   private class Handler extends Thread {
-    public Handler(int instanceNumber) {
+    private final BlockingQueue<Call> myCallQueue;
+    public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
+      this.myCallQueue = cq;
       this.setDaemon(true);
-      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+
+      String threadName = "IPC Server handler " + instanceNumber + " on " + 
port;
+      if (cq == priorityCallQueue) {
+        // this is just an amazing hack, but it works.
+        threadName = "PRI " + threadName;
+      }
+      this.setName(threadName);
     }
 
     @Override
@@ -990,7 +1010,7 @@ public abstract class HBaseServer {
       ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
       while (running) {
         try {
-          Call call = callQueue.take(); // pop the queue; maybe blocked here
+          Call call = myCallQueue.take(); // pop the queue; maybe blocked here
 
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
@@ -1058,33 +1078,58 @@ public abstract class HBaseServer {
 
   }
 
-  protected HBaseServer(String bindAddress, int port,
-                  Class<? extends Writable> paramClass, int handlerCount,
-                  Configuration conf)
-    throws IOException
-  {
-    this(bindAddress, port, paramClass, handlerCount,  conf, 
Integer.toString(port));
+  /**
+   * Gets the QOS level for this call.  If it is higher than the 
highPriorityLevel and there
+   * are priorityHandlers available it will be processed in it's own thread 
set.
+   *
+   * @param param
+   * @return priority, higher is better
+   */
+  private Function<Writable,Integer> qosFunction = null;
+  public void setQosFunction(Function<Writable, Integer> newFunc) {
+    qosFunction = newFunc;
   }
+
+  protected int getQosLevel(Writable param) {
+    if (qosFunction == null) {
+      return 0;
+    }
+
+    Integer res = qosFunction.apply(param);
+    if (res == null) {
+      return 0;
+    }
+    return res;
+  }
+
   /* Constructs a server listening on the named port and address.  Parameters 
passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    *
    */
   protected HBaseServer(String bindAddress, int port,
-                  Class<? extends Writable> paramClass, int handlerCount,
-                  Configuration conf, String serverName)
+                        Class<? extends Writable> paramClass, int handlerCount,
+                        int priorityHandlerCount, Configuration conf, String 
serverName,
+                        int highPriorityLevel)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
+    this.priorityHandlerCount = priorityHandlerCount;
     this.socketSendBufferSize = 0;
     this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
      this.readThreads = conf.getInt(
         "ipc.server.read.threadpool.size",
         10);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);
+    if (priorityHandlerCount > 0) {
+      this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueSize); // 
TODO hack on size
+    } else {
+      this.priorityCallQueue = null;
+    }
+    this.highPriorityLevel = highPriorityLevel;
     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 
1000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 
4000);
@@ -1121,9 +1166,17 @@ public abstract class HBaseServer {
     handlers = new Handler[handlerCount];
 
     for (int i = 0; i < handlerCount; i++) {
-      handlers[i] = new Handler(i);
+      handlers[i] = new Handler(callQueue, i);
       handlers[i].start();
     }
+
+    if (priorityHandlerCount > 0) {
+      priorityHandlers = new Handler[priorityHandlerCount];
+      for (int i = 0 ; i < priorityHandlerCount; i++) {
+        priorityHandlers[i] = new Handler(priorityCallQueue, i);
+        priorityHandlers[i].start();
+      }
+    }
   }
 
   /** Stops the service.  No new calls will be handled after this is called. */
@@ -1131,9 +1184,16 @@ public abstract class HBaseServer {
     LOG.info("Stopping server on " + port);
     running = false;
     if (handlers != null) {
-      for (int i = 0; i < handlerCount; i++) {
-        if (handlers[i] != null) {
-          handlers[i].interrupt();
+      for (Handler handler : handlers) {
+        if (handler != null) {
+          handler.interrupt();
+        }
+      }
+    }
+    if (priorityHandlers != null) {
+      for (Handler handler : priorityHandlers) {
+        if (handler != null) {
+          handler.interrupt();
         }
       }
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=997541&r1=997540&r2=997541&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed 
Sep 15 23:44:08 2010
@@ -95,13 +95,13 @@ import org.apache.zookeeper.Watcher;
  * run the cluster.  All others park themselves in their constructor until
  * master or cluster shutdown or until the active master loses its lease in
  * zookeeper.  Thereafter, all running master jostle to take over master role.
- * 
+ *
  * <p>The Master can be asked shutdown the cluster. See {...@link 
#shutdown()}.  In
  * this case it will tell all regionservers to go down and then wait on them
  * all reporting in that they are down.  This master will then shut itself 
down.
- * 
+ *
  * <p>You can also shutdown just this master.  Call {...@link #stopMaster()}.
- * 
+ *
  * @see HMasterInterface
  * @see HMasterRegionInterface
  * @see Watcher
@@ -171,7 +171,7 @@ implements HMasterInterface, HMasterRegi
    * <li>Initialize master components - server manager, region manager,
    *     region server queue, file system manager, etc
    * </ol>
-   * @throws InterruptedException 
+   * @throws InterruptedException
    */
   public HMaster(final Configuration conf)
   throws IOException, KeeperException, InterruptedException {
@@ -185,13 +185,16 @@ implements HMasterInterface, HMasterRegi
     this.rpcServer = HBaseRPC.getServer(this,
        new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
         a.getBindAddress(), a.getPort(),
-        numHandlers, false, conf);
+        numHandlers,
+        0, // we dont use high priority handlers in master
+        false, conf,
+        0); // this is a DNC w/o high priority handlers
     this.address = new HServerAddress(rpcServer.getListenerAddress());
 
     // set the thread name now we have an address
     setName(MASTER + "-" + this.address);
 
-    // Hack! Maps DFSClient => Master for logs.  HDFS made this 
+    // Hack! Maps DFSClient => Master for logs.  HDFS made this
     // config param for task trackers, but we can piggyback off of it.
     if (this.conf.get("mapred.task.id") == null) {
       this.conf.set("mapred.task.id", "hb_m_" + this.address.toString() +
@@ -340,7 +343,7 @@ implements HMasterInterface, HMasterRegi
    * Initializations we need to do if we are cluster starter.
    * @param starter
    * @param mfs
-   * @throws IOException 
+   * @throws IOException
    */
   private static void clusterStarterInitializations(final MasterFileSystem mfs,
     final ServerManager sm, final CatalogTracker ct, final AssignmentManager 
am)

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=997541&r1=997540&r2=997541&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
Wed Sep 15 23:44:08 2010
@@ -158,7 +158,7 @@ public class HRegion implements HeapSize
    * This directory contains the directory for this region.
    */
   final Path tableDir;
-  
+
   final HLog log;
   final FileSystem fs;
   final Configuration conf;
@@ -631,7 +631,7 @@ public class HRegion implements HeapSize
   private void cleanupTmpDir() throws IOException {
     FSUtils.deleteDirectory(this.fs, getTmpDir());
   }
-  
+
   /**
    * Get the temporary diretory for this region. This directory
    * will have its contents removed when the region is reopened.
@@ -798,7 +798,7 @@ public class HRegion implements HeapSize
 
   /**
    * Flush the memstore.
-   * 
+   *
    * Flushing the memstore is a little tricky. We have a lot of updates in the
    * memstore, all of which have also been written to the log. We need to
    * write those updates in the memstore out to disk, while being able to
@@ -1279,12 +1279,12 @@ public class HRegion implements HeapSize
       retCodes = new OperationStatusCode[operations.length];
       Arrays.fill(retCodes, OperationStatusCode.NOT_RUN);
     }
-    
+
     public boolean isDone() {
       return nextIndexToProcess == operations.length;
     }
   }
-  
+
   /**
    * Perform a batch put with no pre-specified locks
    * @see HRegion#put(Pair[])
@@ -1298,7 +1298,7 @@ public class HRegion implements HeapSize
     }
     return put(putsAndLocks);
   }
-  
+
   /**
    * Perform a batch of puts.
    * @param putsAndLocks the list of puts paired with their requested lock IDs.
@@ -1307,7 +1307,7 @@ public class HRegion implements HeapSize
   public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws 
IOException {
     BatchOperationInProgress<Pair<Put, Integer>> batchOp =
       new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
-    
+
     while (!batchOp.isDone()) {
       checkReadOnly();
       checkResources();
@@ -1384,7 +1384,7 @@ public class HRegion implements HeapSize
             batchOp.operations[i].getFirst().getFamilyMap().values(),
             byteNow);
       }
-      
+
       // ------------------------------------
       // STEP 3. Write to WAL
       // ----------------------------------
@@ -1392,12 +1392,12 @@ public class HRegion implements HeapSize
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         // Skip puts that were determined to be invalid during preprocessing
         if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
-        
+
         Put p = batchOp.operations[i].getFirst();
         if (!p.getWriteToWAL()) continue;
         addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
       }
-      
+
       // Append the edit to WAL
       this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
           walEdit, now);
@@ -1635,7 +1635,7 @@ public class HRegion implements HeapSize
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
         byte[] family = e.getKey();
         List<KeyValue> edits = e.getValue();
-  
+
         Store store = getStore(family);
         for (KeyValue kv: edits) {
           kv.setMemstoreTS(w.getWriteNumber());
@@ -1706,7 +1706,7 @@ public class HRegion implements HeapSize
    * <p>We can ignore any log message that has a sequence ID that's equal to or
    * lower than minSeqId.  (Because we know such log messages are already
    * reflected in the HFiles.)
-   * 
+   *
    * <p>While this is running we are putting pressure on memory yet we are
    * outside of our usual accounting because we are not yet an onlined region
    * (this stuff is being run as part of Region initialization).  This means
@@ -1715,7 +1715,7 @@ public class HRegion implements HeapSize
    * we're not yet online so our relative sequenceids are not yet aligned with
    * HLog sequenceids -- not till we come up online, post processing of split
    * edits.
-   * 
+   *
    * <p>But to help relieve memory pressure, at least manage our own heap size
    * flushing if are in excess of per-region limits.  Flushing, though, we have
    * to be careful and avoid using the regionserver/hlog sequenceid.  Its 
running
@@ -1725,7 +1725,7 @@ public class HRegion implements HeapSize
    * in this region and with its split editlogs, then we could miss edits the
    * next time we go to recover. So, we have to flush inline, using seqids that
    * make sense in a this single region context only -- until we online.
-   * 
+   *
    * @param regiondir
    * @param minSeqId Any edit found in split editlogs needs to be in excess of
    * this minSeqId to be applied, else its skipped.
@@ -1970,7 +1970,7 @@ public class HRegion implements HeapSize
       closeRegionOperation();
     }
   }
-  
+
   /**
    * Obtains or tries to obtain the given row lock.
    * @param waitForLock if true, will block until the lock is available.
@@ -2018,7 +2018,7 @@ public class HRegion implements HeapSize
       closeRegionOperation();
     }
   }
-  
+
   /**
    * Used by unit tests.
    * @param lockid
@@ -2135,6 +2135,9 @@ public class HRegion implements HeapSize
     private boolean filterClosed = false;
     private long readPt;
 
+    public HRegionInfo getRegionName() {
+      return regionInfo;
+    }
     RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws 
IOException {
       //DebugPrint.println("HRegionScanner.<init>");
       this.filter = scan.getFilter();

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=997541&r1=997540&r2=997541&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 Wed Sep 15 23:44:08 2010
@@ -48,6 +48,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Function;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -304,6 +305,85 @@ public class HRegionServer implements HR
     initialize();
   }
 
+  private static final int NORMAL_QOS = 0;
+  private static final int QOS_THRESHOLD = 10;  // the line between low and 
high qos
+  private static final int HIGH_QOS = 100;
+
+  class QosFunction implements Function<Writable,Integer> {
+    public boolean isMetaRegion(byte[] regionName) {
+      HRegion region;
+      try {
+        region = getRegion(regionName);
+      } catch (NotServingRegionException ignored) {
+        return false;
+      }
+      return region.getRegionInfo().isMetaRegion();
+    }
+
+    @Override
+    public Integer apply(Writable from) {
+      if (from instanceof HBaseRPC.Invocation) {
+        HBaseRPC.Invocation inv = (HBaseRPC.Invocation) from;
+
+        String methodName = inv.getMethodName();
+
+        // scanner methods...
+        if (methodName.equals("next") || methodName.equals("close")) {
+          // translate!
+          Long scannerId;
+          try {
+            scannerId = (Long) inv.getParameters()[0];
+          } catch (ClassCastException ignored) {
+            //LOG.debug("Low priority: " + from);
+            return NORMAL_QOS; // doh.
+          }
+          String scannerIdString = Long.toString(scannerId);
+          InternalScanner scanner = scanners.get(scannerIdString);
+          if (scanner instanceof HRegion.RegionScanner) {
+            HRegion.RegionScanner rs = (HRegion.RegionScanner) scanner;
+            HRegionInfo regionName = rs.getRegionName();
+            if (regionName.isMetaRegion()) {
+              //LOG.debug("High priority scanner request: " + scannerId);
+              return HIGH_QOS;
+            }
+          }
+        }
+        else if (methodName.equals("getHServerInfo") ||
+            methodName.equals("getRegionsAssignment") ||
+            methodName.equals("unlockRow") ||
+            methodName.equals("getProtocolVersion") ||
+            methodName.equals("getClosestRowBefore")) {
+          //LOG.debug("High priority method: " + methodName);
+          return HIGH_QOS;
+        }
+        else if (inv.getParameterClasses()[0] == byte[].class) {
+          // first arg is byte array, so assume this is a regionname:
+          if (isMetaRegion((byte[]) inv.getParameters()[0])) {
+            //LOG.debug("High priority with method: " + methodName + " and 
region: "
+            //    + Bytes.toString((byte[]) inv.getParameters()[0]));
+            return HIGH_QOS;
+          }
+        }
+        else if (inv.getParameterClasses()[0] == MultiAction.class) {
+          MultiAction ma = (MultiAction) inv.getParameters()[0];
+          Set<byte[]> regions = ma.getRegions();
+          // ok this sucks, but if any single of the actions touches a meta, 
the whole
+          // thing gets pingged high priority.  This is a dangerous hack 
because people
+          // can get their multi action tagged high QOS by tossing a 
Get(.META.) AND this
+          // regionserver hosts META/-ROOT-
+          for (byte[] region: regions) {
+            if (isMetaRegion(region)) {
+              //LOG.debug("High priority multi with region: " + 
Bytes.toString(region));
+              return HIGH_QOS; // short circuit for the win.
+            }
+          }
+        }
+      }
+      //LOG.debug("Low priority: " + from.toString());
+      return NORMAL_QOS;
+    }
+  }
+
   /**
    * Creates all of the state that needs to be reconstructed in case we are
    * doing a restart. This is shared between the constructor and restart(). 
Both
@@ -326,8 +406,11 @@ public class HRegionServer implements HR
         OnlineRegions.class},
         address.getBindAddress(),
       address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
-      false, conf);
+        conf.getInt("hbase.regionserver.metahandler.count", 10),
+        false, conf, QOS_THRESHOLD);
     this.server.setErrorHandler(this);
+    this.server.setQosFunction(new QosFunction());
+
     // Address is giving a default IP for the moment. Will be changed after
     // calling the master.
     this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(


Reply via email to