Author: stack
Date: Tue Sep 28 05:26:22 2010
New Revision: 1002019
URL: http://svn.apache.org/viewvc?rev=1002019&view=rev
Log:
HBASE-2646 Compaction requests should be prioritized to prevent blocking
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Sep 28 05:26:22 2010
@@ -947,6 +947,8 @@ Release 0.21.0 - Unreleased
(Andy Chen via Stack)
HBASE-3030 The return code of many filesystem operations are not checked
(dhruba borthakur via Stack)
+ HBASE-2646 Compaction requests should be prioritized to prevent blocking
+ (Jeff Whiting via Stack)
NEW FEATURES
HBASE-1961 HBase EC2 scripts
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Tue Sep 28 05:26:22 2010
@@ -20,9 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,10 +40,36 @@ public class CompactSplitThread extends
private final HRegionServer server;
private final Configuration conf;
- private final BlockingQueue<HRegion> compactionQueue =
- new LinkedBlockingQueue<HRegion>();
+ private final PriorityCompactionQueue compactionQueue =
+ new PriorityCompactionQueue();
- private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
+ /** The priorities for a compaction request. */
+ public enum Priority implements Comparable<Priority> {
+ //NOTE: All priorities should be numbered consecutively starting with 1.
+ //The highest priority should be 1 followed by all lower priorities.
+ //Priorities can be changed at anytime without requiring any changes to the
+ //queue.
+
+ /** HIGH_BLOCKING should only be used when an operation is blocked until a
+ * compact / split is done (e.g. a MemStore can't flush because it has
+ * "too many store files" and is blocking until a compact / split is done)
+ */
+ HIGH_BLOCKING(1),
+ /** A normal compaction / split request */
+ NORMAL(2),
+ /** A low compaction / split request -- not currently used */
+ LOW(3);
+
+ int value;
+
+ Priority(int value) {
+ this.value = value;
+ }
+
+ int getInt() {
+ return value;
+ }
+ }
/**
* Splitting should not take place if the total number of regions exceed
this.
@@ -74,9 +97,6 @@ public class CompactSplitThread extends
try {
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (r != null && !this.server.isStopped()) {
- synchronized (regionsInQueue) {
- regionsInQueue.remove(r);
- }
lock.lock();
try {
// Don't interrupt us while we are working
@@ -107,14 +127,23 @@ public class CompactSplitThread extends
}
}
}
- regionsInQueue.clear();
compactionQueue.clear();
LOG.info(getName() + " exiting");
}
public synchronized void requestCompaction(final HRegion r,
final String why) {
- requestCompaction(r, false, why);
+ requestCompaction(r, false, why, Priority.NORMAL);
+ }
+
+ public synchronized void requestCompaction(final HRegion r,
+ final String why, Priority p) {
+ requestCompaction(r, false, why, p);
+ }
+
+ public synchronized void requestCompaction(final HRegion r,
+ final boolean force, final String why) {
+ requestCompaction(r, force, why, Priority.NORMAL);
}
/**
@@ -123,7 +152,7 @@ public class CompactSplitThread extends
* @param why Why compaction requested -- used in debug messages
*/
public synchronized void requestCompaction(final HRegion r,
- final boolean force, final String why) {
+ final boolean force, final String why, Priority priority) {
if (this.server.isStopped()) {
return;
}
@@ -131,14 +160,10 @@ public class CompactSplitThread extends
if (LOG.isDebugEnabled()) {
LOG.debug("Compaction " + (force? "(major) ": "") +
"requested for region " + r.getRegionNameAsString() +
- (why != null && !why.isEmpty()? " because: " + why: ""));
- }
- synchronized (regionsInQueue) {
- if (!regionsInQueue.contains(r)) {
- compactionQueue.add(r);
- regionsInQueue.add(r);
- }
+ (why != null && !why.isEmpty()? " because: " + why: "") +
+ "; Priority: " + priority + "; Compaction queue size: " +
compactionQueue.size());
}
+ compactionQueue.add(r, priority);
}
private void split(final HRegion parent, final byte [] midKey)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Tue Sep 28 05:26:22 2010
@@ -212,7 +212,8 @@ class MemStoreFlusher extends Thread imp
LOG.warn("Region " + region.getRegionNameAsString() + " has too many
" +
"store files; delaying flush up to " + this.blockingWaitTime +
"ms");
}
- this.server.compactSplitThread.requestCompaction(region, getName());
+ this.server.compactSplitThread.requestCompaction(region, getName(),
+ CompactSplitThread.Priority.HIGH_BLOCKING);
// Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms.
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java?rev=1002019&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
Tue Sep 28 05:26:22 2010
@@ -0,0 +1,375 @@
+/**
+* Copyright 2010 The Apache Software Foundation
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
+
+/**
+ * This class delegates to the BlockingQueue but wraps all HRegions in
+ * compaction requests that hold the priority and the date requested.
+ *
+ * Implementation Note: With an elevation time of -1 there is the potential for
+ * starvation of the lower priority compaction requests as long as there is a
+ * constant stream of high priority requests.
+ */
+public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
+ static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
+
+ /**
+ * This class represents a compaction request and holds the region, priority,
+ * and time submitted.
+ */
+ private class CompactionRequest implements Comparable<CompactionRequest> {
+ private final HRegion r;
+ private final Priority p;
+ private final Date date;
+
+ public CompactionRequest(HRegion r, Priority p) {
+ this(r, p, null);
+ }
+
+ public CompactionRequest(HRegion r, Priority p, Date d) {
+ if (r == null) {
+ throw new NullPointerException("HRegion cannot be null");
+ }
+
+ if (p == null) {
+ p = Priority.NORMAL; //the default priority
+ }
+
+ if (d == null) {
+ d = new Date();
+ }
+
+ this.r = r;
+ this.p = p;
+ this.date = d;
+ }
+
+ /**
+ * This function will define where in the priority queue the request will
+ * end up. Those with the highest priorities will be first. When the
+ * priorities are the same it will It will first compare priority then date
+ * to maintain a FIFO functionality.
+ *
+ * <p>Note: The date is only accurate to the millisecond which means it is
+ * possible that two requests were inserted into the queue within a
+ * millisecond. When that is the case this function will break the tie
+ * arbitrarily.
+ */
+ @Override
+ public int compareTo(CompactionRequest request) {
+ //NOTE: The head of the priority queue is the least element
+ if (this.equals(request)) {
+ return 0; //they are the same request
+ }
+ int compareVal;
+
+ compareVal = p.compareTo(request.p); //compare priority
+ if (compareVal != 0) {
+ return compareVal;
+ }
+
+ compareVal = date.compareTo(request.date);
+ if (compareVal != 0) {
+ return compareVal;
+ }
+
+ //break the tie arbitrarily
+ return -1;
+ }
+
+ /** Gets the HRegion for the request */
+ HRegion getHRegion() {
+ return r;
+ }
+
+ /** Gets the priority for the request */
+ Priority getPriority() {
+ return p;
+ }
+
+ public String toString() {
+ return "regionName=" + r.getRegionNameAsString() +
+ ", priority=" + p + ", date=" + date;
+ }
+ }
+
+ /** The actual blocking queue we delegate to */
+ protected final BlockingQueue<CompactionRequest> queue =
+ new PriorityBlockingQueue<CompactionRequest>();
+
+ /** Hash map of the HRegions contained within the Compaction Queue */
+ private final HashMap<HRegion, CompactionRequest> regionsInQueue =
+ new HashMap<HRegion, CompactionRequest>();
+
+ /** Creates a new PriorityCompactionQueue with no priority elevation time */
+ public PriorityCompactionQueue() {
+ LOG.debug("Create PriorityCompactionQueue");
+ }
+
+ /** If the region is not already in the queue it will add it and return a
+ * new compaction request object. If it is already present in the queue
+ * then it will return null.
+ * @param p If null it will use the default priority
+ * @return returns a compaction request if it isn't already in the queue
+ */
+ protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) {
+ CompactionRequest queuedRequest = null;
+ CompactionRequest newRequest = new CompactionRequest(r, p);
+ synchronized (regionsInQueue) {
+ queuedRequest = regionsInQueue.get(r);
+ if (queuedRequest == null ||
+ newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0)
{
+ LOG.trace("Inserting region in queue. " + newRequest);
+ regionsInQueue.put(r, newRequest);
+ } else {
+ LOG.trace("Region already in queue, skipping. Queued: " +
queuedRequest +
+ ", requested: " + newRequest);
+ newRequest = null; // It is already present so don't add it
+ }
+ }
+
+ if (newRequest != null && queuedRequest != null) {
+ // Remove the lower priority request
+ queue.remove(queuedRequest);
+ }
+
+ return newRequest;
+ }
+
+ /** Removes the request from the regions in queue
+ * @param p If null it will use the default priority
+ */
+ protected CompactionRequest removeFromRegionsInQueue(HRegion r) {
+ if (r == null) return null;
+
+ synchronized (regionsInQueue) {
+ CompactionRequest cr = regionsInQueue.remove(r);
+ if (cr == null) {
+ LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r);
+ }
+ return cr;
+ }
+ }
+
+ public boolean add(HRegion e, Priority p) {
+ CompactionRequest request = this.addToRegionsInQueue(e, p);
+ if (request != null) {
+ boolean result = queue.add(request);
+ queue.peek();
+ return result;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean add(HRegion e) {
+ return add(e, null);
+ }
+
+ public boolean offer(HRegion e, Priority p) {
+ CompactionRequest request = this.addToRegionsInQueue(e, p);
+ return (request != null)? queue.offer(request): false;
+ }
+
+ @Override
+ public boolean offer(HRegion e) {
+ return offer(e, null);
+ }
+
+ public void put(HRegion e, Priority p) throws InterruptedException {
+ CompactionRequest request = this.addToRegionsInQueue(e, p);
+ if (request != null) {
+ queue.put(request);
+ }
+ }
+
+ @Override
+ public void put(HRegion e) throws InterruptedException {
+ put(e, null);
+ }
+
+ public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ CompactionRequest request = this.addToRegionsInQueue(e, p);
+ return (request != null)? queue.offer(request, timeout, unit): false;
+ }
+
+ @Override
+ public boolean offer(HRegion e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return offer(e, null, timeout, unit);
+ }
+
+ @Override
+ public HRegion take() throws InterruptedException {
+ CompactionRequest cr = queue.take();
+ if (cr != null) {
+ removeFromRegionsInQueue(cr.getHRegion());
+ return cr.getHRegion();
+ }
+ return null;
+ }
+
+ @Override
+ public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException
{
+ CompactionRequest cr = queue.poll(timeout, unit);
+ if (cr != null) {
+ removeFromRegionsInQueue(cr.getHRegion());
+ return cr.getHRegion();
+ }
+ return null;
+ }
+
+ @Override
+ public boolean remove(Object r) {
+ if (r instanceof HRegion) {
+ CompactionRequest cr = removeFromRegionsInQueue((HRegion) r);
+ if (cr != null) {
+ return queue.remove(cr);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public HRegion remove() {
+ CompactionRequest cr = queue.remove();
+ if (cr != null) {
+ removeFromRegionsInQueue(cr.getHRegion());
+ return cr.getHRegion();
+ }
+ return null;
+ }
+
+ @Override
+ public HRegion poll() {
+ CompactionRequest cr = queue.poll();
+ if (cr != null) {
+ removeFromRegionsInQueue(cr.getHRegion());
+ return cr.getHRegion();
+ }
+ return null;
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return queue.remainingCapacity();
+ }
+
+ @Override
+ public boolean contains(Object r) {
+ if (r instanceof HRegion) {
+ synchronized (regionsInQueue) {
+ return regionsInQueue.containsKey((HRegion) r);
+ }
+ } else if (r instanceof CompactionRequest) {
+ return queue.contains(r);
+ }
+ return false;
+ }
+
+ @Override
+ public HRegion element() {
+ CompactionRequest cr = queue.element();
+ return (cr != null)? cr.getHRegion(): null;
+ }
+
+ @Override
+ public HRegion peek() {
+ CompactionRequest cr = queue.peek();
+ return (cr != null)? cr.getHRegion(): null;
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ @Override
+ public void clear() {
+ regionsInQueue.clear();
+ queue.clear();
+ }
+
+ // Unimplemented methods, collection methods
+
+ @Override
+ public Iterator<HRegion> iterator() {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends HRegion> c) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public int drainTo(Collection<? super HRegion> c) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public int drainTo(Collection<? super HRegion> c, int maxElements) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+}
\ No newline at end of file
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
Tue Sep 28 05:26:22 2010
@@ -20,7 +20,7 @@
package org.apache.hadoop.hbase.regionserver.wal;
-import java.io.EOFException;
+import java.io.FilterInputStream;
import java.io.IOException;
import java.lang.Class;
import java.lang.reflect.Constructor;
@@ -78,18 +78,43 @@ public class SequenceFileLogReader imple
this.length = l;
}
+ // This section can be confusing. It is specific to how HDFS works.
+ // Let me try to break it down. This is the problem:
+ //
+ // 1. HDFS DataNodes update the NameNode about a filename's length
+ // on block boundaries or when a file is closed. Therefore,
+ // if an RS dies, then the NN's fs.getLength() can be out of date
+ // 2. this.in.available() would work, but it returns int &
+ // therefore breaks for files > 2GB (happens on big clusters)
+ // 3. DFSInputStream.getFileLength() gets the actual length from the DNs
+ // 4. DFSInputStream is wrapped 2 levels deep : this.in.in
+ //
+ // So, here we adjust getPos() using getFileLength() so the
+ // SequenceFile.Reader constructor (aka: first invocation) comes out
+ // with the correct end of the file:
+ // this.end = in.getPos() + length;
@Override
public long getPos() throws IOException {
if (this.firstGetPosInvocation) {
this.firstGetPosInvocation = false;
- // Tell a lie. We're doing this just so that this line up in
- // SequenceFile.Reader constructor comes out with the correct length
- // on the file:
- // this.end = in.getPos() + length;
- long available = this.in.available();
- // Length gets added up in the SF.Reader constructor so subtract the
- // difference. If available < this.length, then return this.length.
- return available >= this.length? available - this.length:
this.length;
+ long adjust = 0;
+
+ try {
+ Field fIn = FilterInputStream.class.getDeclaredField("in");
+ fIn.setAccessible(true);
+ Object realIn = fIn.get(this.in);
+ long realLength = ((Long)realIn.getClass().
+ getMethod("getFileLength", new Class<?> []{}).
+ invoke(realIn, new Object []{})).longValue();
+ assert(realLength >= this.length);
+ adjust = realLength - this.length;
+ } catch(Exception e) {
+ SequenceFileLogReader.LOG.warn(
+ "Error while trying to get accurate file length. " +
+ "Truncation / data loss may occur if RegionServers die.", e);
+ }
+
+ return adjust + super.getPos();
}
return super.getPos();
}
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java?rev=1002019&view=auto
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
(added)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Tue Sep 28 05:26:22 2010
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for the priority compaction queue
+ */
+public class TestPriorityCompactionQueue {
+ static final Log LOG = LogFactory.getLog(TestPriorityCompactionQueue.class);
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ class DummyHRegion extends HRegion {
+ String name;
+
+ DummyHRegion(String name) {
+ super();
+ this.name = name;
+ }
+
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ public boolean equals(DummyHRegion r) {
+ return name.equals(r.name);
+ }
+
+ public String toString() {
+ return "[DummyHRegion " + name + "]";
+ }
+
+ public String getRegionNameAsString() {
+ return name;
+ }
+ }
+
+ protected void getAndCheckRegion(PriorityCompactionQueue pq,
+ HRegion checkRegion) {
+ HRegion r = pq.remove();
+ if (r != checkRegion) {
+ Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r
+ .equals(checkRegion));
+ }
+ }
+
+ protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) {
+ pq.add(r, p);
+ try {
+ // Sleep 10 millisecond so 2 things are not put in the queue within the
+ // same millisecond. The queue breaks ties arbitrarily between two
+ // requests inserted at the same time. We want the ordering to
+ // be consistent for our unit test.
+ Thread.sleep(1);
+ } catch (InterruptedException ex) {
+ // continue
+ }
+ }
+
+ //
////////////////////////////////////////////////////////////////////////////
+ // tests
+ //
////////////////////////////////////////////////////////////////////////////
+
+ /** tests general functionality of the compaction queue */
+ @Test public void testPriorityQueue() throws InterruptedException {
+ PriorityCompactionQueue pq = new PriorityCompactionQueue();
+
+ HRegion r1 = new DummyHRegion("r1");
+ HRegion r2 = new DummyHRegion("r2");
+ HRegion r3 = new DummyHRegion("r3");
+ HRegion r4 = new DummyHRegion("r4");
+ HRegion r5 = new DummyHRegion("r5");
+
+ // test 1
+ // check fifo w/priority
+ addRegion(pq, r1, Priority.HIGH_BLOCKING);
+ addRegion(pq, r2, Priority.HIGH_BLOCKING);
+ addRegion(pq, r3, Priority.HIGH_BLOCKING);
+ addRegion(pq, r4, Priority.HIGH_BLOCKING);
+ addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+ getAndCheckRegion(pq, r1);
+ getAndCheckRegion(pq, r2);
+ getAndCheckRegion(pq, r3);
+ getAndCheckRegion(pq, r4);
+ getAndCheckRegion(pq, r5);
+
+ // test 2
+ // check fifo
+ addRegion(pq, r1, null);
+ addRegion(pq, r2, null);
+ addRegion(pq, r3, null);
+ addRegion(pq, r4, null);
+ addRegion(pq, r5, null);
+
+ getAndCheckRegion(pq, r1);
+ getAndCheckRegion(pq, r2);
+ getAndCheckRegion(pq, r3);
+ getAndCheckRegion(pq, r4);
+ getAndCheckRegion(pq, r5);
+
+ // test 3
+ // check fifo w/mixed priority
+ addRegion(pq, r1, Priority.HIGH_BLOCKING);
+ addRegion(pq, r2, Priority.NORMAL);
+ addRegion(pq, r3, Priority.HIGH_BLOCKING);
+ addRegion(pq, r4, Priority.NORMAL);
+ addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+ getAndCheckRegion(pq, r1);
+ getAndCheckRegion(pq, r3);
+ getAndCheckRegion(pq, r5);
+ getAndCheckRegion(pq, r2);
+ getAndCheckRegion(pq, r4);
+
+ // test 4
+ // check fifo w/mixed priority
+ addRegion(pq, r1, Priority.NORMAL);
+ addRegion(pq, r2, Priority.NORMAL);
+ addRegion(pq, r3, Priority.NORMAL);
+ addRegion(pq, r4, Priority.NORMAL);
+ addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+ getAndCheckRegion(pq, r5);
+ getAndCheckRegion(pq, r1);
+ getAndCheckRegion(pq, r2);
+ getAndCheckRegion(pq, r3);
+ getAndCheckRegion(pq, r4);
+
+ // test 5
+ // check fifo w/mixed priority elevation time
+ addRegion(pq, r1, Priority.NORMAL);
+ addRegion(pq, r2, Priority.HIGH_BLOCKING);
+ addRegion(pq, r3, Priority.NORMAL);
+ Thread.sleep(1000);
+ addRegion(pq, r4, Priority.NORMAL);
+ addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+ getAndCheckRegion(pq, r2);
+ getAndCheckRegion(pq, r5);
+ getAndCheckRegion(pq, r1);
+ getAndCheckRegion(pq, r3);
+ getAndCheckRegion(pq, r4);
+
+ // reset the priority compaction queue back to a normal queue
+ pq = new PriorityCompactionQueue();
+
+ // test 7
+ // test that lower priority are removed from the queue when a high priority
+ // is added
+ addRegion(pq, r1, Priority.NORMAL);
+ addRegion(pq, r2, Priority.NORMAL);
+ addRegion(pq, r3, Priority.NORMAL);
+ addRegion(pq, r4, Priority.NORMAL);
+ addRegion(pq, r5, Priority.NORMAL);
+ addRegion(pq, r3, Priority.HIGH_BLOCKING);
+
+ getAndCheckRegion(pq, r3);
+ getAndCheckRegion(pq, r1);
+ getAndCheckRegion(pq, r2);
+ getAndCheckRegion(pq, r4);
+ getAndCheckRegion(pq, r5);
+
+ Assert.assertTrue("Queue should be empty.", pq.size() == 0);
+
+ // test 8
+ // don't add the same region more than once
+ addRegion(pq, r1, Priority.NORMAL);
+ addRegion(pq, r2, Priority.NORMAL);
+ addRegion(pq, r3, Priority.NORMAL);
+ addRegion(pq, r4, Priority.NORMAL);
+ addRegion(pq, r5, Priority.NORMAL);
+ addRegion(pq, r1, Priority.NORMAL);
+ addRegion(pq, r2, Priority.NORMAL);
+ addRegion(pq, r3, Priority.NORMAL);
+ addRegion(pq, r4, Priority.NORMAL);
+ addRegion(pq, r5, Priority.NORMAL);
+
+ getAndCheckRegion(pq, r1);
+ getAndCheckRegion(pq, r2);
+ getAndCheckRegion(pq, r3);
+ getAndCheckRegion(pq, r4);
+ getAndCheckRegion(pq, r5);
+
+ Assert.assertTrue("Queue should be empty.", pq.size() == 0);
+ }
+}
\ No newline at end of file