Author: nspiegelberg
Date: Tue May 10 23:21:04 2011
New Revision: 1101677
URL: http://svn.apache.org/viewvc?rev=1101677&view=rev
Log:
HBASE-1476 Multithreaded Compactions
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue May 10 23:21:04 2011
@@ -212,6 +212,8 @@ Release 0.91.0 - Unreleased
HBASE-3721 Speedup LoadIncrementalHFiles (Ted Yu)
HBASE-3855 Performance degradation of memstore because reseek is linear
(dhruba borthakur)
+ HBASE-3797 StoreFile Level Compaction Locking
+ HBASE-1476 Multithreaded Compactions
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Tue May 10 23:21:04 2011
@@ -19,34 +19,32 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
/**
* Compact region on request and then run split if appropriate
*/
-public class CompactSplitThread extends Thread implements CompactionRequestor {
+public class CompactSplitThread implements CompactionRequestor {
static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
- private final long frequency;
- private final ReentrantLock lock = new ReentrantLock();
private final HRegionServer server;
private final Configuration conf;
- protected final BlockingQueue<CompactionRequest> compactionQueue =
- new PriorityBlockingQueue<CompactionRequest>();
+ private final ThreadPoolExecutor largeCompactions;
+ private final ThreadPoolExecutor smallCompactions;
+ private final ThreadPoolExecutor splits;
+ private final long throttleSize;
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
@@ -62,85 +60,71 @@ public class CompactSplitThread extends
private int regionSplitLimit;
/** @param server */
- public CompactSplitThread(HRegionServer server) {
+ CompactSplitThread(HRegionServer server) {
super();
this.server = server;
this.conf = server.getConfiguration();
this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
Integer.MAX_VALUE);
- this.frequency =
- conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
- 20 * 1000);
+
+ int largeThreads = Math.max(1, conf.getInt(
+ "hbase.regionserver.thread.compaction.large", 1));
+ int smallThreads = conf.getInt(
+ "hbase.regionserver.thread.compaction.small", 0);
+ throttleSize = conf.getLong(
+ "hbase.regionserver.thread.compaction.throttle", 0);
+ int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
+
+ // if we have throttle threads, make sure the user also specified size
+ Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0);
+
+ this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
+ 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
+ this.largeCompactions
+ .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+ if (smallThreads <= 0) {
+ this.smallCompactions = null;
+ } else {
+ this.smallCompactions = new ThreadPoolExecutor(smallThreads,
smallThreads,
+ 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
+ this.smallCompactions
+ .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+ }
+ this.splits = (ThreadPoolExecutor) Executors
+ .newFixedThreadPool(splitThreads);
}
@Override
- public void run() {
- while (!this.server.isStopped()) {
- CompactionRequest compactionRequest = null;
- HRegion r = null;
- boolean completed = false;
- try {
- compactionRequest = compactionQueue.poll(this.frequency,
TimeUnit.MILLISECONDS);
- if (compactionRequest != null) {
- r = compactionRequest.getHRegion();
- lock.lock();
- try {
- // look for a split first
- if(!this.server.isStopped()) {
- // don't split regions that are blocking
- if (shouldSplitRegion() && r.getCompactPriority() >=
PRIORITY_USER) {
- byte[] midkey = compactionRequest.getStore().checkSplit();
- if (midkey != null) {
- split(r, midkey);
- continue;
- }
- }
- }
-
- // now test for compaction
- if(!this.server.isStopped()) {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
- completed = r.compact(compactionRequest);
- long now = EnvironmentEdgeManager.currentTimeMillis();
- LOG.info(((completed) ? "completed" : "aborted")
- + " compaction: " + compactionRequest + ", duration="
- + StringUtils.formatTimeDiff(now, startTime));
- if (completed) { // compaction aborted?
- this.server.getMetrics().
- addCompaction(now - startTime, compactionRequest.getSize());
- }
- }
- } finally {
- lock.unlock();
- }
- }
- } catch (InterruptedException ex) {
- continue;
- } catch (IOException ex) {
- LOG.error("Compaction/Split failed " + compactionRequest,
- RemoteExceptionHandler.checkIOException(ex));
- if (!server.checkFileSystem()) {
- break;
- }
- } catch (Exception ex) {
- LOG.error("Compaction failed " + compactionRequest, ex);
- if (!server.checkFileSystem()) {
- break;
- }
- } finally {
- if (compactionRequest != null) {
- Store s = compactionRequest.getStore();
- s.finishRequest(compactionRequest);
- // degenerate case: blocked regions require recursive enqueues
- if (s.getCompactPriority() < PRIORITY_USER && completed) {
- requestCompaction(r, s, "Recursive enqueue");
- }
- }
- compactionRequest = null;
+ public String toString() {
+ return "compaction_queue="
+ + (smallCompactions != null ? "("
+ + largeCompactions.getQueue().size() + ":"
+ + smallCompactions.getQueue().size() + ")"
+ : largeCompactions.getQueue().size())
+ + ", split_queue=" + splits.getQueue().size();
+ }
+
+ public synchronized boolean requestSplit(final HRegion r) {
+ // don't split regions that are blocking
+ if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
+ byte[] midKey = r.checkSplit();
+ if (midKey != null) {
+ requestSplit(r, midKey);
+ return true;
}
}
- compactionQueue.clear();
- LOG.info(getName() + " exiting");
+ return false;
+ }
+
+ public synchronized void requestSplit(final HRegion r, byte[] midKey) {
+ try {
+ this.splits.execute(new SplitRequest(r, midKey, this.server));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Split requested for " + r + ". " + this);
+ }
+ } catch (RejectedExecutionException ree) {
+ LOG.info("Could not execute split for " + r, ree);
+ }
}
public synchronized void requestCompaction(final HRegion r,
@@ -164,7 +148,7 @@ public class CompactSplitThread extends
/**
* @param r HRegion store belongs to
- * @param force Whether next compaction should be major
+ * @param s Store to request compaction on
* @param why Why compaction requested -- used in debug messages
* @param priority override the default priority (NO_PRIORITY == decide)
*/
@@ -175,67 +159,58 @@ public class CompactSplitThread extends
}
CompactionRequest cr = s.requestCompaction();
if (cr != null) {
+ cr.setServer(server);
if (priority != NO_PRIORITY) {
cr.setPriority(priority);
}
- boolean addedToQueue = compactionQueue.add(cr);
- if (!addedToQueue) {
- LOG.error("Could not add request to compaction queue: " + cr);
- s.finishRequest(cr);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Compaction requested: " + cr
- + (why != null && !why.isEmpty() ? "; Because: " + why : "")
- + "; Priority: " + priority + "; Compaction queue size: "
- + compactionQueue.size());
+ ThreadPoolExecutor pool = largeCompactions;
+ if (smallCompactions != null && throttleSize > cr.getSize()) {
+ // smallCompactions is like the 10 items or less line at Walmart
+ pool = smallCompactions;
}
- }
- }
-
- private void split(final HRegion parent, final byte [] midKey)
- throws IOException {
- final long startTime = System.currentTimeMillis();
- SplitTransaction st = new SplitTransaction(parent, midKey);
- // If prepare does not return true, for some reason -- logged inside in
- // the prepare call -- we are not ready to split just now. Just return.
- if (!st.prepare()) return;
- try {
- st.execute(this.server, this.server);
- } catch (Exception e) {
- try {
- LOG.info("Running rollback of failed split of " +
- parent.getRegionNameAsString() + "; " + e.getMessage());
- st.rollback(this.server, this.server);
- LOG.info("Successful rollback of failed split of " +
- parent.getRegionNameAsString());
- } catch (Exception ee) {
- // If failed rollback, kill this server to avoid having a hole in
table.
- LOG.info("Failed rollback of failed split of " +
- parent.getRegionNameAsString() + " -- aborting server", ee);
- this.server.abort("Failed split");
+ pool.execute(cr);
+ if (LOG.isDebugEnabled()) {
+ String type = "";
+ if (smallCompactions != null) {
+ type = (pool == smallCompactions) ? "Small " : "Large ";
+ }
+ LOG.debug(type + "Compaction requested: " + cr
+ + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ + "; " + this);
}
- return;
}
-
- LOG.info("Region split, META updated, and report to master. Parent=" +
- parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
- st.getFirstDaughter().getRegionNameAsString() + ", " +
- st.getSecondDaughter().getRegionNameAsString() + ". Split took " +
- StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
}
/**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptIfNecessary() {
- if (lock.tryLock()) {
+ splits.shutdown();
+ largeCompactions.shutdown();
+ if (smallCompactions != null)
+ smallCompactions.shutdown();
+ }
+
+ private void waitFor(ThreadPoolExecutor t, String name) {
+ boolean done = false;
+ while (!done) {
try {
- this.interrupt();
- } finally {
- lock.unlock();
+ done = t.awaitTermination(60, TimeUnit.SECONDS);
+ LOG.debug("Waiting for " + name + " to finish...");
+ } catch (InterruptedException ie) {
+ LOG.debug("Interrupted waiting for " + name + " to finish...");
}
}
}
+ void join() {
+ waitFor(splits, "Split Thread");
+ waitFor(largeCompactions, "Large Compaction Thread");
+ if (smallCompactions != null) {
+ waitFor(smallCompactions, "Small Compaction Thread");
+ }
+ }
+
/**
* Returns the current size of the queue containing regions that are
* processed.
@@ -243,7 +218,10 @@ public class CompactSplitThread extends
* @return The current size of the regions queue.
*/
public int getCompactionQueueSize() {
- return compactionQueue.size();
+ int size = largeCompactions.getQueue().size();
+ if (smallCompactions != null)
+ size += smallCompactions.getQueue().size();
+ return size;
}
private boolean shouldSplitRegion() {
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Tue May 10 23:21:04 2011
@@ -890,7 +890,8 @@ public class HRegion implements HeapSize
return false;
}
}
- LOG.info("Starting compaction on region " + this);
+ LOG.info("Starting compaction on " + cr.getStore() + " in region "
+ + this);
doRegionCompactionPrep();
try {
status.setStatus("Compacting store " + cr.getStore());
@@ -3707,6 +3708,20 @@ public class HRegion implements HeapSize
// nothing
}
+ byte[] checkSplit() {
+ if (this.splitPoint != null) {
+ return this.splitPoint;
+ }
+ byte[] splitPoint = null;
+ for (Store s : stores.values()) {
+ splitPoint = s.checkSplit();
+ if (splitPoint != null) {
+ return splitPoint;
+ }
+ }
+ return null;
+ }
+
/**
* @return The priority that this region should have in the compaction queue
*/
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue May 10 23:21:04 2011
@@ -226,7 +226,7 @@ public class HRegionServer implements HR
private RegionServerMetrics metrics;
// Compactions
- CompactSplitThread compactSplitThread;
+ public CompactSplitThread compactSplitThread;
// Cache flushing
MemStoreFlusher cacheFlusher;
@@ -1017,7 +1017,7 @@ public class HRegionServer implements HR
*
* @return false if file system is not available
*/
- protected boolean checkFileSystem() {
+ public boolean checkFileSystem() {
if (this.fsOk && this.fs != null) {
try {
FSUtils.checkFileSystemAvailable(this.fs);
@@ -1247,8 +1247,6 @@ public class HRegionServer implements HR
Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler);
- Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
- handler);
Threads.setDaemonThreadRunning(this.majorCompactionChecker, n +
".majorCompactionChecker", handler);
@@ -1316,7 +1314,7 @@ public class HRegionServer implements HR
return false;
}
// Verify that all threads are alive
- if (!(leases.isAlive() && compactSplitThread.isAlive()
+ if (!(leases.isAlive()
&& cacheFlusher.isAlive() && hlogRoller.isAlive()
&& this.majorCompactionChecker.isAlive())) {
stop("One or more threads are no longer alive -- stop");
@@ -1434,8 +1432,10 @@ public class HRegionServer implements HR
protected void join() {
Threads.shutdown(this.majorCompactionChecker);
Threads.shutdown(this.cacheFlusher);
- Threads.shutdown(this.compactSplitThread);
Threads.shutdown(this.hlogRoller);
+ if (this.compactSplitThread != null) {
+ this.compactSplitThread.join();
+ }
if (this.service != null) this.service.shutdown();
if (this.replicationHandler != null) {
this.replicationHandler.join();
@@ -2338,11 +2338,7 @@ public class HRegionServer implements HR
HRegion region = getRegion(regionInfo.getRegionName());
region.flushcache();
region.forceSplit(splitPoint);
- // force a compaction, split will be side-effect
- // TODO: flush/compact/split refactor will make it trivial to do this
- // sync/async (and won't require us to do a compaction to split!)
- compactSplitThread.requestCompaction(region, "User-triggered split",
- CompactSplitThread.PRIORITY_USER);
+ compactSplitThread.requestSplit(region, region.checkSplit());
}
@Override
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Tue May 10 23:21:04 2011
@@ -354,7 +354,9 @@ class MemStoreFlusher extends Thread imp
LOG.warn("Region " + region.getRegionNameAsString() + " has too many
" +
"store files; delaying flush up to " + this.blockingWaitTime +
"ms");
}
- this.server.compactSplitThread.requestCompaction(region, getName());
+ if (!this.server.compactSplitThread.requestSplit(region)) {
+ this.server.compactSplitThread.requestCompaction(region, getName());
+ }
// Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms.
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java?rev=1101677&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
Tue May 10 23:21:04 2011
@@ -0,0 +1,76 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles processing region splits. Put in a queue, owned by HRegionServer.
+ */
+class SplitRequest implements Runnable {
+ static final Log LOG = LogFactory.getLog(SplitRequest.class);
+ private final HRegion parent;
+ private final byte[] midKey;
+ private final HRegionServer server;
+
+ SplitRequest(HRegion region, byte[] midKey, HRegionServer hrs) {
+ Preconditions.checkNotNull(hrs);
+ this.parent = region;
+ this.midKey = midKey;
+ this.server = hrs;
+ }
+
+ @Override
+ public String toString() {
+ return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey);
+ }
+
+ @Override
+ public void run() {
+ try {
+ final long startTime = System.currentTimeMillis();
+ SplitTransaction st = new SplitTransaction(parent, midKey);
+ // If prepare does not return true, for some reason -- logged inside in
+ // the prepare call -- we are not ready to split just now. Just return.
+ if (!st.prepare()) return;
+ try {
+ st.execute(this.server, this.server);
+ } catch (Exception e) {
+ try {
+ LOG.info("Running rollback of failed split of " + parent + "; "
+ + e.getMessage());
+ st.rollback(this.server, this.server);
+ LOG.info("Successful rollback of failed split of " + parent);
+ } catch (RuntimeException ee) {
+ // If failed rollback, kill server to avoid having a hole in table.
+ LOG.info("Failed rollback of failed split of "
+ + parent.getRegionNameAsString() + " -- aborting server", ee);
+ this.server.abort("Failed split");
+ }
+ return;
+ }
+ LOG.info("Region split, META updated, and report to master. Parent="
+ + parent.getRegionInfo().getRegionNameAsString() + ", new regions: "
+ + st.getFirstDaughter().getRegionNameAsString() + ", "
+ + st.getSecondDaughter().getRegionNameAsString() + ". Split took "
+ + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+ } catch (IOException ex) {
+ LOG.error("Split failed " + this, RemoteExceptionHandler
+ .checkIOException(ex));
+ server.checkFileSystem();
+ }
+ }
+
+}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Tue May 10 23:21:04 2011
@@ -695,7 +695,7 @@ public class Store implements HeapSize {
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = filesToCompact.indexOf(last);
Preconditions.checkArgument(idx != -1);
- filesToCompact = filesToCompact.subList(idx+1,
filesToCompact.size());
+ filesToCompact.subList(0, idx + 1).clear();
}
int count = filesToCompact.size();
if (N > count) {
@@ -868,7 +868,7 @@ public class Store implements HeapSize {
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidates.indexOf(last);
Preconditions.checkArgument(idx != -1);
- candidates = candidates.subList(idx + 1, candidates.size());
+ candidates.subList(0, idx + 1).clear();
}
List<StoreFile> filesToCompact = compactSelection(candidates);
@@ -974,6 +974,11 @@ public class Store implements HeapSize {
int start = 0;
double r = this.compactRatio;
+ // skip selection algorithm if we don't have enough files
+ if (filesToCompact.size() < this.minFilesToCompact) {
+ return Collections.emptyList();
+ }
+
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
// Sort files by size to correct when normal skew is altered by bulk
load.
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
@@ -1390,7 +1395,7 @@ public class Store implements HeapSize {
* Determines if Store should be split
* @return byte[] if store should be split, null otherwise.
*/
- byte[] checkSplit() {
+ public byte[] checkSplit() {
this.lock.readLock().lock();
try {
boolean force = this.region.shouldForceSplit();
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Tue May 10 23:21:04 2011
@@ -19,20 +19,33 @@
*/
package org.apache.hadoop.hbase.regionserver.compactions;
+import java.io.IOException;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
- /**
- * This class represents a compaction request and holds the region, priority,
- * and time submitted.
- */
- public class CompactionRequest implements Comparable<CompactionRequest> {
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * This class holds all details necessary to run a compaction.
+ */
+public class CompactionRequest implements Comparable<CompactionRequest>,
+ Runnable {
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion r;
private final Store s;
@@ -41,20 +54,12 @@ import org.apache.hadoop.hbase.regionser
private final boolean isMajor;
private int p;
private final Date date;
-
- public CompactionRequest(HRegion r, Store s) {
- this(r, s, null, false, s.getCompactPriority());
- }
-
- public CompactionRequest(HRegion r, Store s, int p) {
- this(r, s, null, false, p);
- }
+ private HRegionServer server = null;
public CompactionRequest(HRegion r, Store s,
List<StoreFile> files, boolean isMajor, int p) {
- if (r == null) {
- throw new NullPointerException("HRegion cannot be null");
- }
+ Preconditions.checkNotNull(r);
+ Preconditions.checkNotNull(files);
this.r = r;
this.s = s;
@@ -136,10 +141,77 @@ import org.apache.hadoop.hbase.regionser
this.p = p;
}
+ public void setServer(HRegionServer hrs) {
+ this.server = hrs;
+ }
+
+ @Override
public String toString() {
+ String fsList = Joiner.on(", ").join(
+ Collections2.transform(Collections2.filter(files,
+ new Predicate<StoreFile>() {
+ public boolean apply(StoreFile sf) {
+ return sf.getReader() != null;
+ }
+ }), new Function<StoreFile, String>() {
+ public String apply(StoreFile sf) {
+ return StringUtils.humanReadableInt(sf.getReader().length());
+ }
+ }));
+
return "regionName=" + r.getRegionNameAsString() +
", storeName=" + new String(s.getFamily().getName()) +
", fileCount=" + files.size() +
+ ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
+ ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
", priority=" + p + ", date=" + date;
}
+
+ @Override
+ public void run() {
+ Preconditions.checkNotNull(server);
+ if (server.isStopped()) {
+ return;
+ }
+ try {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ boolean completed = r.compact(this);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
+ this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+ if (completed) {
+ server.getMetrics().addCompaction(now - start, this.totalSize);
+ // degenerate case: blocked regions require recursive enqueues
+ if (s.getCompactPriority() <= 0) {
+ server.compactSplitThread
+ .requestCompaction(r, s, "Recursive enqueue");
+ }
+ }
+ } catch (IOException ex) {
+ LOG.error("Compaction failed " + this, RemoteExceptionHandler
+ .checkIOException(ex));
+ server.checkFileSystem();
+ } catch (Exception ex) {
+ LOG.error("Compaction failed " + this, ex);
+ server.checkFileSystem();
+ } finally {
+ s.finishRequest(this);
+ LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
+ }
+ }
+
+ /**
+ * Cleanup class to use when rejecting a compaction request from the queue.
+ */
+ public static class Rejection implements RejectedExecutionHandler {
+
+ @Override
+ public void rejectedExecution(Runnable request, ThreadPoolExecutor pool)
{
+ if (request instanceof CompactionRequest) {
+ CompactionRequest cr = (CompactionRequest) request;
+ LOG.debug("Compaction Rejected: " + cr);
+ cr.getStore().finishRequest(cr);
+ }
+ }
+ }
}