Author: stack
Date: Fri Oct 29 06:08:44 2010
New Revision: 1028622

URL: http://svn.apache.org/viewvc?rev=1028622&view=rev
Log:
HBASE-3160 Use more intelligent priorities for PriorityCompactionQueue

Modified:
    hbase/trunk/CHANGES.txt
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java

Modified: hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Oct 29 06:08:44 2010
@@ -622,6 +622,8 @@ Release 0.21.0 - Unreleased
    HBASE-3012  TOF doesn't take zk client port for remote clusters
    HBASE-3159  Double play of OpenedRegionHandler for a single region
                and assorted fixes around this + TestRollingRestart added
+   HBASE-3160  Use more intelligent priorities for PriorityCompactionQueue
+               (Nicolas Spiegelberg via Stack)
 
 
   IMPROVEMENTS

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
 Fri Oct 29 06:08:44 2010
@@ -43,33 +43,10 @@ public class CompactSplitThread extends 
   private final PriorityCompactionQueue compactionQueue =
     new PriorityCompactionQueue();
 
-  /** The priorities for a compaction request. */
-  public enum Priority implements Comparable<Priority> {
-    //NOTE: All priorities should be numbered consecutively starting with 1.
-    //The highest priority should be 1 followed by all lower priorities.
-    //Priorities can be changed at anytime without requiring any changes to the
-    //queue.
-
-    /** HIGH_BLOCKING should only be used when an operation is blocked until a
-     * compact / split is done (e.g. a MemStore can't flush because it has
-     * "too many store files" and is blocking until a compact / split is done)
-     */
-    HIGH_BLOCKING(1),
-    /** A normal compaction / split request */
-    NORMAL(2),
-    /** A low compaction / split request -- not currently used */
-    LOW(3);
-
-    int value;
-
-    Priority(int value) {
-      this.value = value;
-    }
-
-    int getInt() {
-      return value;
-    }
-  }
+  /* The default priority for user-specified compaction requests.
+   * The user gets top priority unless we have blocking compactions. (Pri <= 0)
+   */
+  public static final int PRIORITY_USER = 1;
 
   /**
    * Splitting should not take place if the total number of regions exceed 
this.
@@ -138,30 +115,28 @@ public class CompactSplitThread extends 
 
   public synchronized void requestCompaction(final HRegion r,
       final String why) {
-    requestCompaction(r, false, why, Priority.NORMAL);
+    requestCompaction(r, false, why, r.getCompactPriority());
   }
 
   public synchronized void requestCompaction(final HRegion r,
-      final String why, Priority p) {
+      final String why, int p) {
     requestCompaction(r, false, why, p);
   }
 
-  public synchronized void requestCompaction(final HRegion r,
-      final boolean force, final String why) {
-    requestCompaction(r, force, why, Priority.NORMAL);
-  }
-
   /**
    * @param r HRegion store belongs to
    * @param force Whether next compaction should be major
    * @param why Why compaction requested -- used in debug messages
    */
   public synchronized void requestCompaction(final HRegion r,
-      final boolean force, final String why, Priority priority) {
+      final boolean force, final String why, int priority) {
     if (this.server.isStopped()) {
       return;
     }
-    r.setForceMajorCompaction(force);
+    // tell the region to major-compact (and don't downgrade it)
+    if (force) {
+      r.setForceMajorCompaction(force);
+    }
     if (compactionQueue.add(r, priority) && LOG.isDebugEnabled()) {
       LOG.debug("Compaction " + (force? "(major) ": "") +
         "requested for region " + r.getRegionNameAsString() +

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
 Fri Oct 29 06:08:44 2010
@@ -25,4 +25,11 @@ public interface CompactionRequestor {
    * @param why Why compaction was requested -- used in debug messages
    */
   public void requestCompaction(final HRegion r, final String why);
+
+  /**
+   * @param r Region to compact
+   * @param why Why compaction was requested -- used in debug messages
+   * @param pri Priority of this compaction. minHeap. <=0 is critical
+   */
+  public void requestCompaction(final HRegion r, final String why, int pri);
 }
\ No newline at end of file

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
Fri Oct 29 06:08:44 2010
@@ -3280,6 +3280,17 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * @return The priority that this region should have in the compaction queue
+   */
+  public int getCompactPriority() {
+    int count = Integer.MAX_VALUE;
+    for(Store store : stores.values()) {
+      count = Math.min(count, store.getCompactPriority());
+    }
+    return count;
+  }
+
+  /**
    * Checks every store to see if one has too many
    * store files
    * @return true if any store has too many store files

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 Fri Oct 29 06:08:44 2010
@@ -2023,7 +2023,8 @@ public class HRegionServer implements HR
     // force a compaction, split will be side-effect
     // TODO: flush/compact/split refactor will make it trivial to do this
     // sync/async (and won't require us to do a compaction to split!)
-    compactSplitThread.requestCompaction(region, "User-triggered split");
+    compactSplitThread.requestCompaction(region, "User-triggered split",
+        CompactSplitThread.PRIORITY_USER);
   }
 
   @Override
@@ -2033,7 +2034,8 @@ public class HRegionServer implements HR
     region.flushcache();
     region.shouldSplit(true);
     compactSplitThread.requestCompaction(region, major, "User-triggered "
-        + (major ? "major " : "") + "compaction");
+        + (major ? "major " : "") + "compaction",
+        CompactSplitThread.PRIORITY_USER);
   }
 
   /** @return the info server */

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
 Fri Oct 29 06:08:44 2010
@@ -212,8 +212,7 @@ class MemStoreFlusher extends Thread imp
           LOG.warn("Region " + region.getRegionNameAsString() + " has too many 
" +
             "store files; delaying flush up to " + this.blockingWaitTime + 
"ms");
         }
-        this.server.compactSplitThread.requestCompaction(region, getName(),
-          CompactSplitThread.Priority.HIGH_BLOCKING);
+        this.server.compactSplitThread.requestCompaction(region, getName());
         // Put back on the queue.  Have it come back out of the queue
         // after a delay of this.blockingWaitTime / 100 ms.
         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
 Fri Oct 29 06:08:44 2010
@@ -28,7 +28,6 @@ import java.util.concurrent.PriorityBloc
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
 
 /**
  * This class delegates to the BlockingQueue but wraps all HRegions in
@@ -47,22 +46,18 @@ public class PriorityCompactionQueue imp
    */
   private class CompactionRequest implements Comparable<CompactionRequest> {
     private final HRegion r;
-    private final Priority p;
+    private final int p;
     private final Date date;
 
-    public CompactionRequest(HRegion r, Priority p) {
+    public CompactionRequest(HRegion r, int p) {
       this(r, p, null);
     }
 
-    public CompactionRequest(HRegion r, Priority p, Date d) {
+    public CompactionRequest(HRegion r, int p, Date d) {
       if (r == null) {
         throw new NullPointerException("HRegion cannot be null");
       }
 
-      if (p == null) {
-        p = Priority.NORMAL; //the default priority
-      }
-
       if (d == null) {
         d = new Date();
       }
@@ -91,7 +86,7 @@ public class PriorityCompactionQueue imp
       }
       int compareVal;
 
-      compareVal = p.compareTo(request.p); //compare priority
+      compareVal = p - request.p; //compare priority
       if (compareVal != 0) {
         return compareVal;
       }
@@ -111,7 +106,7 @@ public class PriorityCompactionQueue imp
     }
 
     /** Gets the priority for the request */
-    Priority getPriority() {
+    int getPriority() {
       return p;
     }
 
@@ -140,13 +135,13 @@ public class PriorityCompactionQueue imp
    * @param p If null it will use the default priority
    * @return returns a compaction request if it isn't already in the queue
    */
-  protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) {
+  protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
     CompactionRequest queuedRequest = null;
     CompactionRequest newRequest = new CompactionRequest(r, p);
     synchronized (regionsInQueue) {
       queuedRequest = regionsInQueue.get(r);
       if (queuedRequest == null ||
-          newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) 
{
+          newRequest.getPriority() < queuedRequest.getPriority()) {
         LOG.trace("Inserting region in queue. " + newRequest);
         regionsInQueue.put(r, newRequest);
       } else {
@@ -189,7 +184,7 @@ public class PriorityCompactionQueue imp
     }
   }
 
-  public boolean add(HRegion e, Priority p) {
+  public boolean add(HRegion e, int p) {
     CompactionRequest request = this.addToRegionsInQueue(e, p);
     if (request != null) {
       boolean result = queue.add(request);
@@ -201,20 +196,20 @@ public class PriorityCompactionQueue imp
 
   @Override
   public boolean add(HRegion e) {
-    return add(e, null);
+    return add(e, e.getCompactPriority());
   }
 
-  public boolean offer(HRegion e, Priority p) {
+  public boolean offer(HRegion e, int p) {
     CompactionRequest request = this.addToRegionsInQueue(e, p);
     return (request != null)? queue.offer(request): false;
   }
 
   @Override
   public boolean offer(HRegion e) {
-    return offer(e, null);
+    return offer(e, e.getCompactPriority());
   }
 
-  public void put(HRegion e, Priority p) throws InterruptedException {
+  public void put(HRegion e, int p) throws InterruptedException {
     CompactionRequest request = this.addToRegionsInQueue(e, p);
     if (request != null) {
       queue.put(request);
@@ -223,10 +218,10 @@ public class PriorityCompactionQueue imp
 
   @Override
   public void put(HRegion e) throws InterruptedException {
-    put(e, null);
+    put(e, e.getCompactPriority());
   }
 
-  public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit)
+  public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
   throws InterruptedException {
     CompactionRequest request = this.addToRegionsInQueue(e, p);
     return (request != null)? queue.offer(request, timeout, unit): false;
@@ -235,7 +230,7 @@ public class PriorityCompactionQueue imp
   @Override
   public boolean offer(HRegion e, long timeout, TimeUnit unit)
   throws InterruptedException {
-    return offer(e, null, timeout, unit);
+    return offer(e, e.getCompactPriority(), timeout, unit);
   }
 
   @Override

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
Fri Oct 29 06:08:44 2010
@@ -95,6 +95,7 @@ public class Store implements HeapSize {
   /* how many bytes to write between status checks */
   static int closeCheckInterval = 0; 
   private final long desiredMaxFileSize;
+  private final int blockingStoreFileCount;
   private volatile long storeSize = 0L;
   private final Object flushLock = new Object();
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -186,6 +187,8 @@ public class Store implements HeapSize {
         HConstants.DEFAULT_MAX_FILE_SIZE);
     }
     this.desiredMaxFileSize = maxFileSize;
+    this.blockingStoreFileCount =
+      conf.getInt("hbase.hstore.blockingStoreFiles", -1);
 
     this.majorCompactionTime =
       conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
@@ -1331,6 +1334,13 @@ public class Store implements HeapSize {
     }
     return size;
   }
+  
+  /**
+   * @return The priority that this store should have in the compaction queue
+   */
+  int getCompactPriority() {
+    return this.blockingStoreFileCount - this.storefiles.size();
+  }
 
   /**
    * Datastructure that holds size and row to split a file around.

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java?rev=1028622&r1=1028621&r2=1028622&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
 Fri Oct 29 06:08:44 2010
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionse
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -76,7 +75,7 @@ public class TestPriorityCompactionQueue
     }
   }
 
-  protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) {
+  protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) {
     pq.add(r, p);
     try {
       // Sleep 1 millisecond so 2 things are not put in the queue within the
@@ -105,11 +104,11 @@ public class TestPriorityCompactionQueue
 
     // test 1
     // check fifo w/priority
-    addRegion(pq, r1, Priority.HIGH_BLOCKING);
-    addRegion(pq, r2, Priority.HIGH_BLOCKING);
-    addRegion(pq, r3, Priority.HIGH_BLOCKING);
-    addRegion(pq, r4, Priority.HIGH_BLOCKING);
-    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r2, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r4, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
 
     getAndCheckRegion(pq, r1);
     getAndCheckRegion(pq, r2);
@@ -118,26 +117,12 @@ public class TestPriorityCompactionQueue
     getAndCheckRegion(pq, r5);
 
     // test 2
-    // check fifo
-    addRegion(pq, r1, null);
-    addRegion(pq, r2, null);
-    addRegion(pq, r3, null);
-    addRegion(pq, r4, null);
-    addRegion(pq, r5, null);
-
-    getAndCheckRegion(pq, r1);
-    getAndCheckRegion(pq, r2);
-    getAndCheckRegion(pq, r3);
-    getAndCheckRegion(pq, r4);
-    getAndCheckRegion(pq, r5);
-
-    // test 3
     // check fifo w/mixed priority
-    addRegion(pq, r1, Priority.HIGH_BLOCKING);
-    addRegion(pq, r2, Priority.NORMAL);
-    addRegion(pq, r3, Priority.HIGH_BLOCKING);
-    addRegion(pq, r4, Priority.NORMAL);
-    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
 
     getAndCheckRegion(pq, r1);
     getAndCheckRegion(pq, r3);
@@ -145,13 +130,13 @@ public class TestPriorityCompactionQueue
     getAndCheckRegion(pq, r2);
     getAndCheckRegion(pq, r4);
 
-    // test 4
+    // test 3
     // check fifo w/mixed priority
-    addRegion(pq, r1, Priority.NORMAL);
-    addRegion(pq, r2, Priority.NORMAL);
-    addRegion(pq, r3, Priority.NORMAL);
-    addRegion(pq, r4, Priority.NORMAL);
-    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
 
     getAndCheckRegion(pq, r5);
     getAndCheckRegion(pq, r1);
@@ -159,14 +144,14 @@ public class TestPriorityCompactionQueue
     getAndCheckRegion(pq, r3);
     getAndCheckRegion(pq, r4);
 
-    // test 5
+    // test 4
     // check fifo w/mixed priority elevation time
-    addRegion(pq, r1, Priority.NORMAL);
-    addRegion(pq, r2, Priority.HIGH_BLOCKING);
-    addRegion(pq, r3, Priority.NORMAL);
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r2, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
     Thread.sleep(1000);
-    addRegion(pq, r4, Priority.NORMAL);
-    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+    addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r5, CompactSplitThread.PRIORITY_BLOCKING);
 
     getAndCheckRegion(pq, r2);
     getAndCheckRegion(pq, r5);
@@ -177,15 +162,15 @@ public class TestPriorityCompactionQueue
     // reset the priority compaction queue back to a normal queue
     pq = new PriorityCompactionQueue();
 
-    // test 7
+    // test 5
     // test that lower priority are removed from the queue when a high priority
     // is added
-    addRegion(pq, r1, Priority.NORMAL);
-    addRegion(pq, r2, Priority.NORMAL);
-    addRegion(pq, r3, Priority.NORMAL);
-    addRegion(pq, r4, Priority.NORMAL);
-    addRegion(pq, r5, Priority.NORMAL);
-    addRegion(pq, r3, Priority.HIGH_BLOCKING);
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
 
     getAndCheckRegion(pq, r3);
     getAndCheckRegion(pq, r1);
@@ -195,18 +180,18 @@ public class TestPriorityCompactionQueue
 
     Assert.assertTrue("Queue should be empty.", pq.size() == 0);
 
-    // test 8
+    // test 6
     // don't add the same region more than once
-    addRegion(pq, r1, Priority.NORMAL);
-    addRegion(pq, r2, Priority.NORMAL);
-    addRegion(pq, r3, Priority.NORMAL);
-    addRegion(pq, r4, Priority.NORMAL);
-    addRegion(pq, r5, Priority.NORMAL);
-    addRegion(pq, r1, Priority.NORMAL);
-    addRegion(pq, r2, Priority.NORMAL);
-    addRegion(pq, r3, Priority.NORMAL);
-    addRegion(pq, r4, Priority.NORMAL);
-    addRegion(pq, r5, Priority.NORMAL);
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
 
     getAndCheckRegion(pq, r1);
     getAndCheckRegion(pq, r2);
@@ -215,5 +200,19 @@ public class TestPriorityCompactionQueue
     getAndCheckRegion(pq, r5);
 
     Assert.assertTrue("Queue should be empty.", pq.size() == 0);
+    
+    // test 7
+    // we can handle negative priorities
+    addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
+    addRegion(pq, r2, -1);
+    addRegion(pq, r3, CompactSplitThread.PRIORITY_BLOCKING);
+    addRegion(pq, r4, -2);
+
+    getAndCheckRegion(pq, r4);
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r1);
+
+    Assert.assertTrue("Queue should be empty.", pq.size() == 0);
   }
 }
\ No newline at end of file


Reply via email to