Author: nspiegelberg
Date: Tue May 10 23:20:45 2011
New Revision: 1101676
URL: http://svn.apache.org/viewvc?rev=1101676&view=rev
Log:
HBASE-3797 StoreFile Level Compaction Locking
Removed:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Tue May 10 23:20:45 2011
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -28,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
@@ -42,13 +45,14 @@ public class CompactSplitThread extends
private final HRegionServer server;
private final Configuration conf;
- private final PriorityCompactionQueue compactionQueue =
- new PriorityCompactionQueue();
+ protected final BlockingQueue<CompactionRequest> compactionQueue =
+ new PriorityBlockingQueue<CompactionRequest>();
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
*/
public static final int PRIORITY_USER = 1;
+ public static final int NO_PRIORITY = Integer.MIN_VALUE;
/**
* Splitting should not take place if the total number of regions exceed
this.
@@ -74,21 +78,36 @@ public class CompactSplitThread extends
while (!this.server.isStopped()) {
CompactionRequest compactionRequest = null;
HRegion r = null;
+ boolean completed = false;
try {
compactionRequest = compactionQueue.poll(this.frequency,
TimeUnit.MILLISECONDS);
if (compactionRequest != null) {
+ r = compactionRequest.getHRegion();
lock.lock();
try {
+ // look for a split first
if(!this.server.isStopped()) {
- // Don't interrupt us while we are working
- r = compactionRequest.getHRegion();
- byte [] midKey = r.compactStore(compactionRequest.getStore());
- if (r.getLastCompactInfo() != null) { // compaction aborted?
- this.server.getMetrics().addCompaction(r.getLastCompactInfo());
+ // don't split regions that are blocking
+ if (shouldSplitRegion() && r.getCompactPriority() >=
PRIORITY_USER) {
+ byte[] midkey = compactionRequest.getStore().checkSplit();
+ if (midkey != null) {
+ split(r, midkey);
+ continue;
+ }
}
- if (shouldSplitRegion() && midKey != null &&
- !this.server.isStopped()) {
- split(r, midKey);
+ }
+
+ // now test for compaction
+ if(!this.server.isStopped()) {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ completed = r.compact(compactionRequest);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ LOG.info(((completed) ? "completed" : "aborted")
+ + " compaction: " + compactionRequest + ", duration="
+ + StringUtils.formatTimeDiff(now, startTime));
+ if (completed) { // compaction aborted?
+ this.server.getMetrics().
+ addCompaction(now - startTime, compactionRequest.getSize());
}
}
} finally {
@@ -98,19 +117,26 @@ public class CompactSplitThread extends
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
- LOG.error("Compaction/Split failed for region " +
- r.getRegionNameAsString(),
+ LOG.error("Compaction/Split failed " + compactionRequest,
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
}
} catch (Exception ex) {
- LOG.error("Compaction failed" +
- (r != null ? (" for region " + r.getRegionNameAsString()) : ""),
- ex);
+ LOG.error("Compaction failed " + compactionRequest, ex);
if (!server.checkFileSystem()) {
break;
}
+ } finally {
+ if (compactionRequest != null) {
+ Store s = compactionRequest.getStore();
+ s.finishRequest(compactionRequest);
+ // degenerate case: blocked regions require recursive enqueues
+ if (s.getCompactPriority() < PRIORITY_USER && completed) {
+ requestCompaction(r, s, "Recursive enqueue");
+ }
+ }
+ compactionRequest = null;
}
}
compactionQueue.clear();
@@ -120,19 +146,19 @@ public class CompactSplitThread extends
public synchronized void requestCompaction(final HRegion r,
final String why) {
for(Store s : r.getStores().values()) {
- requestCompaction(r, s, false, why, s.getCompactPriority());
+ requestCompaction(r, s, why, NO_PRIORITY);
}
}
- public synchronized void requestCompaction(final HRegion r,
- final String why, int p) {
- requestCompaction(r, false, why, p);
+ public synchronized void requestCompaction(final HRegion r, final Store s,
+ final String why) {
+ requestCompaction(r, s, why, NO_PRIORITY);
}
- public synchronized void requestCompaction(final HRegion r,
- final boolean force, final String why, int p) {
+ public synchronized void requestCompaction(final HRegion r, final String why,
+ int p) {
for(Store s : r.getStores().values()) {
- requestCompaction(r, s, force, why, p);
+ requestCompaction(r, s, why, p);
}
}
@@ -140,24 +166,28 @@ public class CompactSplitThread extends
* @param r HRegion store belongs to
* @param force Whether next compaction should be major
* @param why Why compaction requested -- used in debug messages
+ * @param priority override the default priority (NO_PRIORITY == decide)
*/
public synchronized void requestCompaction(final HRegion r, final Store s,
- final boolean force, final String why, int priority) {
+ final String why, int priority) {
if (this.server.isStopped()) {
return;
}
- // tell the region to major-compact (and don't downgrade it)
- if (force) {
- s.setForceMajorCompaction(force);
- }
- CompactionRequest compactionRequest = new CompactionRequest(r, s,
priority);
- if (compactionQueue.add(compactionRequest) && LOG.isDebugEnabled()) {
- LOG.debug("Compaction " + (force? "(major) ": "") +
- "requested for region " + r.getRegionNameAsString() +
- "/" + r.getRegionInfo().getEncodedName() +
- ", store " + s +
- (why != null && !why.isEmpty()? " because " + why: "") +
- "; priority=" + priority + ", compaction queue size=" +
compactionQueue.size());
+ CompactionRequest cr = s.requestCompaction();
+ if (cr != null) {
+ if (priority != NO_PRIORITY) {
+ cr.setPriority(priority);
+ }
+ boolean addedToQueue = compactionQueue.add(cr);
+ if (!addedToQueue) {
+ LOG.error("Could not add request to compaction queue: " + cr);
+ s.finishRequest(cr);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Compaction requested: " + cr
+ + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ + "; Priority: " + priority + "; Compaction queue size: "
+ + compactionQueue.size());
+ }
}
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
Tue May 10 23:20:45 2011
@@ -28,8 +28,25 @@ public interface CompactionRequestor {
/**
* @param r Region to compact
+ * @param s Store within region to compact
+ * @param why Why compaction was requested -- used in debug messages
+ */
+ public void requestCompaction(final HRegion r, final Store s, final String
why);
+
+ /**
+ * @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
*/
public void requestCompaction(final HRegion r, final String why, int pri);
+
+ /**
+ * @param r Region to compact
+ * @param s Store within region to compact
+ * @param why Why compaction was requested -- used in debug messages
+ * @param pri Priority of this compaction. minHeap. <=0 is critical
+ */
+ public void requestCompaction(final HRegion r, final Store s,
+ final String why, int pri);
+
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Tue May 10 23:20:45 2011
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.ipc.Copro
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -101,6 +102,7 @@ import org.apache.hadoop.util.StringUtil
import org.cliffc.high_scale_lib.Counter;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MutableClassToInstanceMap;
@@ -206,8 +208,8 @@ public class HRegion implements HeapSize
volatile boolean flushing = false;
// Set when a flush has been requested.
volatile boolean flushRequested = false;
- // Set while a compaction is running.
- volatile boolean compacting = false;
+ // Number of compactions running.
+ volatile int compacting = 0;
// Gets set in close. If set, cannot compact or flush again.
volatile boolean writesEnabled = true;
// Set if region is read-only
@@ -395,7 +397,7 @@ public class HRegion implements HeapSize
this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly());
- this.writestate.compacting = false;
+ this.writestate.compacting = 0;
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
@@ -606,12 +608,10 @@ public class HRegion implements HeapSize
writestate.writesEnabled = false;
wasFlushing = writestate.flushing;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
- while (writestate.compacting || writestate.flushing) {
- LOG.debug("waiting for" +
- (writestate.compacting ? " compaction" : "") +
- (writestate.flushing ?
- (writestate.compacting ? "," : "") + " cache flush" :
- "") + " to complete for region " + this);
+ while (writestate.compacting > 0 || writestate.flushing) {
+ LOG.debug("waiting for " + writestate.compacting + " compactions" +
+ (writestate.flushing ? " & cache flush" : "") +
+ " to complete for region " + this);
try {
writestate.wait();
} catch (InterruptedException iex) {
@@ -734,11 +734,6 @@ public class HRegion implements HeapSize
return this.fs;
}
- /** @return info about the last compaction <time, size> */
- public Pair<Long,Long> getLastCompactInfo() {
- return this.lastCompactInfo;
- }
-
/** @return the last time the region was flushed */
public long getLastFlushTime() {
return this.lastFlushTime;
@@ -794,9 +789,9 @@ public class HRegion implements HeapSize
return new Path(getRegionDir(), ".tmp");
}
- void setForceMajorCompaction(final boolean b) {
+ void triggerMajorCompaction() {
for (Store h: stores.values()) {
- h.setForceMajorCompaction(b);
+ h.triggerMajorCompaction();
}
}
@@ -817,7 +812,9 @@ public class HRegion implements HeapSize
*/
byte [] compactStores(final boolean majorCompaction)
throws IOException {
- this.setForceMajorCompaction(majorCompaction);
+ if (majorCompaction) {
+ this.triggerMajorCompaction();
+ }
return compactStores();
}
@@ -826,13 +823,21 @@ public class HRegion implements HeapSize
* to be split.
*/
public byte[] compactStores() throws IOException {
- byte[] splitRow = null;
for(Store s : getStores().values()) {
- if(splitRow == null) {
- splitRow = compactStore(s);
+ CompactionRequest cr = s.requestCompaction();
+ if(cr != null) {
+ try {
+ compact(cr);
+ } finally {
+ s.finishRequest(cr);
+ }
+ }
+ byte[] splitRow = s.checkSplit();
+ if (splitRow != null) {
+ return splitRow;
}
}
- return splitRow;
+ return null;
}
/*
@@ -846,93 +851,76 @@ public class HRegion implements HeapSize
* conflicts with a region split, and that cannot happen because the region
* server does them sequentially and not in parallel.
*
- * @return split row if split is needed
+ * @param cr Compaction details, obtained by requestCompaction()
+ * @return whether the compaction completed
* @throws IOException e
*/
- public byte [] compactStore(Store store) throws IOException {
- if (this.closing.get()) {
- LOG.debug("Skipping compaction on " + this + " because closing");
- return null;
+ public boolean compact(CompactionRequest cr)
+ throws IOException {
+ if (cr == null) {
+ return false;
}
+ if (this.closing.get() || this.closed.get()) {
+ LOG.debug("Skipping compaction on " + this + " because closing/closed");
+ return false;
+ }
+ Preconditions.checkArgument(cr.getHRegion().equals(this));
lock.readLock().lock();
- this.lastCompactInfo = null;
- byte [] splitRow = null;
MonitoredTask status = TaskMonitor.get().createStatus(
- "Compacting stores in " + this);
+ "Compacting " + cr.getStore() + " in " + this);
try {
if (this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closed");
- return null;
- }
- if (this.closed.get()) {
- return splitRow;
+ return false;
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor preCompact hooks");
coprocessorHost.preCompact(false);
}
+ boolean decr = true;
try {
synchronized (writestate) {
- if (!writestate.compacting && writestate.writesEnabled) {
- writestate.compacting = true;
+ if (writestate.writesEnabled) {
+ ++writestate.compacting;
} else {
- String msg = "NOT compacting region " + this +
- ": compacting=" + writestate.compacting + ", writesEnabled=" +
- writestate.writesEnabled;
+ String msg = "NOT compacting region " + this + ". Writes
disabled.";
LOG.info(msg);
status.abort(msg);
- return splitRow;
+ decr = false;
+ return false;
}
}
LOG.info("Starting compaction on region " + this);
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
doRegionCompactionPrep();
- long lastCompactSize = 0;
- boolean completed = false;
try {
- status.setStatus("Compacting store " + store);
- final Store.StoreSize ss = store.compact();
- lastCompactSize += store.getLastCompactSize();
- if (ss != null) {
- splitRow = ss.getSplitRow();
- }
- completed = true;
+ status.setStatus("Compacting store " + cr.getStore());
+ cr.getStore().compact(cr);
} catch (InterruptedIOException iioe) {
- LOG.info("compaction interrupted by user: ", iioe);
- } finally {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- LOG.info(((completed) ? "completed" : "aborted")
- + " compaction on region " + this
- + " after " + StringUtils.formatTimeDiff(now, startTime));
- if (completed) {
- this.lastCompactInfo =
- new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
- status.setStatus("Compaction complete: " +
- StringUtils.humanReadableInt(lastCompactSize) + " in " +
- (now - startTime) + "ms");
- }
+ String msg = "compaction interrupted by user";
+ LOG.info(msg, iioe);
+ status.abort(msg);
+ return false;
}
} finally {
- synchronized (writestate) {
- writestate.compacting = false;
- writestate.notifyAll();
+ if (decr) {
+ synchronized (writestate) {
+ --writestate.compacting;
+ if (writestate.compacting <= 0) {
+ writestate.notifyAll();
+ }
+ }
}
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-compact hooks");
- coprocessorHost.postCompact(splitRow != null);
+ coprocessorHost.postCompact(false);
}
-
status.markComplete("Compaction complete");
+ return true;
} finally {
status.cleanup();
lock.readLock().unlock();
}
- if (splitRow != null) {
- assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
- this.splitPoint = null; // clear the split point (if set)
- }
- return splitRow;
}
/**
@@ -3708,6 +3696,10 @@ public class HRegion implements HeapSize
}
}
+ void clearSplit_TESTS_ONLY() {
+ this.splitRequest = false;
+ }
+
/**
* Give the region a chance to prepare before it is split.
*/
@@ -3731,9 +3723,9 @@ public class HRegion implements HeapSize
* store files
* @return true if any store has too many store files
*/
- public boolean hasTooManyStoreFiles() {
+ public boolean needsCompaction() {
for(Store store : stores.values()) {
- if(store.hasTooManyStoreFiles()) {
+ if(store.needsCompaction()) {
return true;
}
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue May 10 23:20:45 2011
@@ -1046,14 +1046,18 @@ public class HRegionServer implements HR
@Override
protected void chore() {
for (HRegion r : this.instance.onlineRegions.values()) {
- try {
- if (r != null && r.isMajorCompaction()) {
- // Queue a compaction. Will recognize if major is needed.
- this.instance.compactSplitThread.requestCompaction(r, getName()
- + " requests major compaction");
+ if (r == null)
+ continue;
+ for (Store s : r.getStores().values()) {
+ try {
+ if (s.isMajorCompaction()) {
+ // Queue a compaction. Will recognize if major is needed.
+ this.instance.compactSplitThread.requestCompaction(r, s,
+ getName() + " requests major compaction");
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed major compaction check on " + r, e);
}
- } catch (IOException e) {
- LOG.warn("Failed major compaction check on " + r, e);
}
}
}
@@ -1346,10 +1350,10 @@ public class HRegionServer implements HR
final boolean daughter)
throws KeeperException, IOException {
// Do checks to see if we need to compact (references or too many files)
- if (r.hasReferences() || r.hasTooManyStoreFiles()) {
- getCompactionRequester().requestCompaction(r,
- r.hasReferences()? "Region has references on open" :
- "Region has too many store files");
+ for (Store s : r.getStores().values()) {
+ if (s.hasReferences() || s.needsCompaction()) {
+ getCompactionRequester().requestCompaction(r, s, "Opening Region");
+ }
}
// Add to online regions if all above was successful.
@@ -2346,7 +2350,10 @@ public class HRegionServer implements HR
public void compactRegion(HRegionInfo regionInfo, boolean major)
throws NotServingRegionException, IOException {
HRegion region = getRegion(regionInfo.getRegionName());
- compactSplitThread.requestCompaction(region, major, "User-triggered "
+ if (major) {
+ region.triggerMajorCompaction();
+ }
+ compactSplitThread.requestCompaction(region, "User-triggered "
+ (major ? "major " : "") + "compaction",
CompactSplitThread.PRIORITY_USER);
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Tue May 10 23:20:45 2011
@@ -24,8 +24,10 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -48,16 +50,20 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* A Store holds a column family in a Region. Its a memstore and a set of zero
@@ -102,7 +108,7 @@ public class Store implements HeapSize {
// With float, java will downcast your long to float for comparisons (bad)
private double compactRatio;
private long lastCompactSize = 0;
- private volatile boolean forceMajor = false;
+ volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
static int closeCheckInterval = 0;
private final long desiredMaxFileSize;
@@ -119,12 +125,12 @@ public class Store implements HeapSize {
*/
private ImmutableList<StoreFile> storefiles = null;
+ List<StoreFile> filesCompacting = Lists.newArrayList();
// All access must be synchronized.
private final CopyOnWriteArraySet<ChangedReadersObserver>
changedReaderObservers =
new CopyOnWriteArraySet<ChangedReadersObserver>();
- private final Object compactLock = new Object();
private final int blocksize;
private final boolean blockcache;
/** Compression algorithm for flush files and minor compaction */
@@ -569,7 +575,7 @@ public class Store implements HeapSize {
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
- return this.storefiles.size() >= this.minFilesToCompact;
+ return needsCompaction();
} finally {
this.lock.writeLock().unlock();
}
@@ -620,99 +626,108 @@ public class Store implements HeapSize {
* <p>We don't want to hold the structureLock for the whole time, as a
compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
- * @return row to split around if a split is needed, null otherwise
+ * @param CompactionRequest
+ * compaction details obtained from requestCompaction()
* @throws IOException
*/
- StoreSize compact() throws IOException {
- boolean forceSplit = this.region.shouldForceSplit();
- synchronized (compactLock) {
- this.lastCompactSize = 0; // reset first in case compaction is aborted
+ void compact(CompactionRequest cr) throws IOException {
+ if (cr == null || cr.getFiles().isEmpty()) {
+ return;
+ }
+ Preconditions.checkArgument(cr.getStore().toString()
+ .equals(this.toString()));
- // sanity checks
- for (StoreFile sf : this.storefiles) {
- if (sf.getPath() == null || sf.getReader() == null) {
- boolean np = sf.getPath() == null;
- LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader"));
- return null;
- }
- }
+ List<StoreFile> filesToCompact = cr.getFiles();
+
+ synchronized (filesCompacting) {
+ // sanity check: we're compacting files that this store knows about
+ // TODO: change this to LOG.error() after more debugging
+ Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
+ }
+
+ // Max-sequenceID is the last key in the files we're compacting
+ long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
- // if the user wants to force a split, skip compaction unless necessary
- boolean references = hasReferences(this.storefiles);
- if (forceSplit && !this.forceMajor && !references) {
- return checkSplit(forceSplit);
- }
-
- Collection<StoreFile> filesToCompact
- = compactSelection(this.storefiles, this.forceMajor);
-
- // empty == do not compact
- if (filesToCompact.isEmpty()) {
- // but do see if we need to split before returning
- return checkSplit(forceSplit);
- }
-
- // sum size of all files included in compaction
- long totalSize = 0;
- for (StoreFile sf : filesToCompact) {
- totalSize += sf.getReader().length();
- }
- this.lastCompactSize = totalSize;
-
- // major compaction iff all StoreFiles are included
- boolean majorcompaction
- = (filesToCompact.size() == this.storefiles.size());
- if (majorcompaction) {
- this.forceMajor = false;
- }
-
- // Max-sequenceID is the last key in the files we're compacting
- long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
-
- // Ready to go. Have list of files to compact.
- LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in
cf=" +
- this.storeNameStr +
- (hasReferences(filesToCompact)? ", hasReferences=true,": " ") + " into
" +
- region.getTmpDir() + ", seqid=" + maxId +
- ", totalSize=" + StringUtils.humanReadableInt(totalSize));
- StoreFile.Writer writer
- = compactStore(filesToCompact, majorcompaction, maxId);
+ // Ready to go. Have list of files to compact.
+ LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ + this.storeNameStr + " of "
+ + this.region.getRegionInfo().getRegionNameAsString()
+ + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ + StringUtils.humanReadableInt(cr.getSize()));
+
+ StoreFile sf = null;
+ try {
+ StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
+ maxId);
// Move the compaction into place.
- StoreFile sf = completeCompaction(filesToCompact, writer);
- if (LOG.isInfoEnabled()) {
- LOG.info("Completed" + (majorcompaction? " major ": " ") +
- "compaction of " + filesToCompact.size() +
- " file(s), new file=" + (sf == null? "none": sf.toString()) +
- ", size=" + (sf == null? "none":
StringUtils.humanReadableInt(sf.getReader().length())) +
- "; total size for store is " +
StringUtils.humanReadableInt(storeSize));
+ sf = completeCompaction(filesToCompact, writer);
+ } finally {
+ synchronized (filesCompacting) {
+ filesCompacting.removeAll(filesToCompact);
}
}
- return checkSplit(forceSplit);
+
+ LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ + this.region.getRegionInfo().getRegionNameAsString()
+ + "; new storefile name=" + (sf == null ? "none" : sf.toString())
+ + ", size=" + (sf == null ? "none" :
+ StringUtils.humanReadableInt(sf.getReader().length()))
+ + "; total size for store is "
+ + StringUtils.humanReadableInt(storeSize));
}
/*
* Compact the most recent N files. Essentially a hook for testing.
*/
protected void compactRecent(int N) throws IOException {
- synchronized(compactLock) {
- List<StoreFile> filesToCompact = this.storefiles;
- int count = filesToCompact.size();
- if (N > count) {
- throw new RuntimeException("Not enough files");
+ List<StoreFile> filesToCompact;
+ long maxId;
+ boolean isMajor;
+
+ this.lock.readLock().lock();
+ try {
+ synchronized (filesCompacting) {
+ filesToCompact = Lists.newArrayList(storefiles);
+ if (!filesCompacting.isEmpty()) {
+ // exclude all files older than the newest file we're currently
+ // compacting. this allows us to preserve contiguity (HBASE-2856)
+ StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+ int idx = filesToCompact.indexOf(last);
+ Preconditions.checkArgument(idx != -1);
+ filesToCompact = filesToCompact.subList(idx+1,
filesToCompact.size());
+ }
+ int count = filesToCompact.size();
+ if (N > count) {
+ throw new RuntimeException("Not enough files");
+ }
+
+ filesToCompact = filesToCompact.subList(count - N, count);
+ maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+ isMajor = (filesToCompact.size() == storefiles.size());
+ filesCompacting.addAll(filesToCompact);
+ Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
}
+ } finally {
+ this.lock.readLock().unlock();
+ }
- filesToCompact = new
ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
- long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
- boolean majorcompaction = (N == count);
-
- // Ready to go. Have list of files to compact.
- StoreFile.Writer writer
- = compactStore(filesToCompact, majorcompaction, maxId);
+ try {
+ // Ready to go. Have list of files to compact.
+ StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
+ } finally {
+ synchronized (filesCompacting) {
+ filesCompacting.removeAll(filesToCompact);
+ }
}
}
+ boolean hasReferences() {
+ return hasReferences(this.storefiles);
+ }
+
/*
* @param files
* @return True if any of the files in <code>files</code> are References.
@@ -835,6 +850,69 @@ public class Store implements HeapSize {
return ret;
}
+ public CompactionRequest requestCompaction() {
+ // don't even select for compaction if writes are disabled
+ if (!this.region.areWritesEnabled()) {
+ return null;
+ }
+
+ CompactionRequest ret = null;
+ this.lock.readLock().lock();
+ try {
+ synchronized (filesCompacting) {
+ // candidates = all storefiles not already in compaction queue
+ List<StoreFile> candidates = Lists.newArrayList(storefiles);
+ if (!filesCompacting.isEmpty()) {
+ // exclude all files older than the newest file we're currently
+ // compacting. this allows us to preserve contiguity (HBASE-2856)
+ StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+ int idx = candidates.indexOf(last);
+ Preconditions.checkArgument(idx != -1);
+ candidates = candidates.subList(idx + 1, candidates.size());
+ }
+ List<StoreFile> filesToCompact = compactSelection(candidates);
+
+ // no files to compact
+ if (filesToCompact.isEmpty()) {
+ return null;
+ }
+
+ // basic sanity check: do not try to compact the same StoreFile twice.
+ if (!Collections.disjoint(filesCompacting, filesToCompact)) {
+ // TODO: change this from an IAE to LOG.error after sufficient
testing
+ Preconditions.checkArgument(false, "%s overlaps with %s",
+ filesToCompact, filesCompacting);
+ }
+ filesCompacting.addAll(filesToCompact);
+ Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+
+ // major compaction iff all StoreFiles are included
+ boolean isMajor = (filesToCompact.size() == this.storefiles.size());
+ if (isMajor) {
+ // since we're enqueuing a major, update the compaction wait interval
+ this.forceMajor = false;
+ this.majorCompactionTime = getNextMajorCompactTime();
+ }
+
+ // everything went better than expected. create a compaction request
+ int pri = getCompactPriority();
+ ret = new CompactionRequest(region, this, filesToCompact, isMajor,
pri);
+ }
+ } catch (IOException ex) {
+ LOG.error("Compaction Request failed for region " + region + ", store "
+ + this, RemoteExceptionHandler.checkIOException(ex));
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ return ret;
+ }
+
+ public void finishRequest(CompactionRequest cr) {
+ synchronized (filesCompacting) {
+ filesCompacting.removeAll(cr.getFiles());
+ }
+ }
+
/**
* Algorithm to choose which files to compact
*
@@ -851,12 +929,13 @@ public class Store implements HeapSize {
* max files to compact at once (avoids OOM)
*
* @param candidates candidate files, ordered from oldest to newest
- * @param majorcompaction whether to force a major compaction
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
- List<StoreFile> compactSelection(List<StoreFile> candidates,
- boolean forcemajor) throws IOException {
+ List<StoreFile> compactSelection(List<StoreFile> candidates)
+ throws IOException {
+ // ASSUMPTION!!! filesCompacting is locked when calling this function
+
/* normal skew:
*
* older ----> newer
@@ -870,6 +949,7 @@ public class Store implements HeapSize {
*/
List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
+ boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
if (!forcemajor) {
// do not compact old files above a configurable threshold
// save all references. we MUST compact them
@@ -888,9 +968,6 @@ public class Store implements HeapSize {
// major compact on user action or age (caveat: we have too many files)
boolean majorcompaction = (forcemajor || isMajorCompaction(filesToCompact))
&& filesToCompact.size() < this.maxFilesToCompact;
- if (majorcompaction) {
- this.majorCompactionTime = getNextMajorCompactTime();
- }
if (!majorcompaction && !hasReferences(filesToCompact)) {
// we're doing a minor compaction, let's see what files are applicable
@@ -1054,9 +1131,6 @@ public class Store implements HeapSize {
}
/*
- * It's assumed that the compactLock will be acquired prior to calling this
- * method! Otherwise, it is not thread-safe!
- *
* <p>It works by processing a compaction that's been written to disk.
*
* <p>It is usually invoked at the end of a compaction, but might also be
@@ -1097,18 +1171,13 @@ public class Store implements HeapSize {
this.lock.writeLock().lock();
try {
try {
- // 2. Unloading
- // 3. Loading the new TreeMap.
// Change this.storefiles so it reflects new state but do not
// delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding
// scanners.
- ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>();
- for (StoreFile sf : storefiles) {
- if (!compactedFiles.contains(sf)) {
- newStoreFiles.add(sf);
- }
- }
+ ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
+ newStoreFiles.removeAll(compactedFiles);
+ filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
// If a StoreFile result, move it into place. May be null.
if (result != null) {
@@ -1318,13 +1387,13 @@ public class Store implements HeapSize {
}
/**
- * Determines if HStore can be split
- * @param force Whether to force a split or not.
- * @return a StoreSize if store can be split, null otherwise.
+ * Determines if Store should be split
+ * @return byte[] if store should be split, null otherwise.
*/
- StoreSize checkSplit(final boolean force) {
+ byte[] checkSplit() {
this.lock.readLock().lock();
try {
+ boolean force = this.region.shouldForceSplit();
// sanity checks
if (this.storefiles.isEmpty()) {
return null;
@@ -1369,7 +1438,7 @@ public class Store implements HeapSize {
}
// if the user explicit set a split point, use that
if (this.region.getSplitPoint() != null) {
- return new StoreSize(maxSize, this.region.getSplitPoint());
+ return this.region.getSplitPoint();
}
StoreFile.Reader r = largestSf.getReader();
if (r == null) {
@@ -1396,7 +1465,7 @@ public class Store implements HeapSize {
}
return null;
}
- return new StoreSize(maxSize, mk.getRow());
+ return mk.getRow();
}
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this.storeNameStr, e);
@@ -1416,8 +1485,8 @@ public class Store implements HeapSize {
return storeSize;
}
- void setForceMajorCompaction(final boolean b) {
- this.forceMajor = b;
+ void triggerMajorCompaction() {
+ this.forceMajor = true;
}
boolean getForceMajorCompaction() {
@@ -1493,28 +1562,6 @@ public class Store implements HeapSize {
return this.blockingStoreFileCount - this.storefiles.size();
}
- /**
- * Datastructure that holds size and row to split a file around.
- * TODO: Take a KeyValue rather than row.
- */
- static class StoreSize {
- private final long size;
- private final byte [] row;
-
- StoreSize(long size, byte [] row) {
- this.size = size;
- this.row = row;
- }
- /* @return the size */
- long getSize() {
- return size;
- }
-
- byte [] getSplitRow() {
- return this.row;
- }
- }
-
HRegion getHRegion() {
return this.region;
}
@@ -1624,8 +1671,8 @@ public class Store implements HeapSize {
* @return true if number of store files is greater than
* the number defined in minFilesToCompact
*/
- public boolean hasTooManyStoreFiles() {
- return this.storefiles.size() > this.minFilesToCompact;
+ public boolean needsCompaction() {
+ return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
}
public static final long FIXED_OVERHEAD = ClassSize.align(
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Tue May 10 23:20:45 2011
@@ -20,11 +20,13 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Date;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
/**
* This class represents a compaction request and holds the region, priority,
@@ -34,30 +36,37 @@ import org.apache.hadoop.hbase.regionser
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion r;
private final Store s;
+ private final List<StoreFile> files;
+ private final long totalSize;
+ private final boolean isMajor;
private int p;
private final Date date;
public CompactionRequest(HRegion r, Store s) {
- this(r, s, s.getCompactPriority());
+ this(r, s, null, false, s.getCompactPriority());
}
public CompactionRequest(HRegion r, Store s, int p) {
- this(r, s, p, null);
+ this(r, s, null, false, p);
}
- public CompactionRequest(HRegion r, Store s, int p, Date d) {
+ public CompactionRequest(HRegion r, Store s,
+ List<StoreFile> files, boolean isMajor, int p) {
if (r == null) {
throw new NullPointerException("HRegion cannot be null");
}
- if (d == null) {
- d = new Date();
- }
-
this.r = r;
this.s = s;
+ this.files = files;
+ long sz = 0;
+ for (StoreFile sf : files) {
+ sz += sf.getReader().length();
+ }
+ this.totalSize = sz;
+ this.isMajor = isMajor;
this.p = p;
- this.date = d;
+ this.date = new Date();
}
/**
@@ -89,8 +98,8 @@ import org.apache.hadoop.hbase.regionser
return compareVal;
}
- //break the tie arbitrarily
- return -1;
+ // break the tie based on hash code
+ return this.hashCode() - request.hashCode();
}
/** Gets the HRegion for the request */
@@ -103,6 +112,20 @@ import org.apache.hadoop.hbase.regionser
return s;
}
+ /** Gets the StoreFiles for the request */
+ public List<StoreFile> getFiles() {
+ return files;
+ }
+
+ /** Gets the total size of all StoreFiles in compaction */
+ public long getSize() {
+ return totalSize;
+ }
+
+ public boolean isMajor() {
+ return this.isMajor;
+ }
+
/** Gets the priority for the request */
public int getPriority() {
return p;
@@ -115,8 +138,8 @@ import org.apache.hadoop.hbase.regionser
public String toString() {
return "regionName=" + r.getRegionNameAsString() +
- ((s == null) ? ""
- : "storeName = " + new String(s.getFamily().getName())) +
+ ", storeName=" + new String(s.getFamily().getName()) +
+ ", fileCount=" + files.size() +
", priority=" + p + ", date=" + date;
}
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
Tue May 10 23:20:45 2011
@@ -314,11 +314,12 @@ public class RegionServerMetrics impleme
}
/**
- * @param compact history in <time, size>
+ * @param time time that compaction took
+ * @param size bytesize of storefiles in the compaction
*/
- public synchronized void addCompaction(final Pair<Long,Long> compact) {
- this.compactionTime.inc(compact.getFirst());
- this.compactionSize.inc(compact.getSecond());
+ public synchronized void addCompaction(long time, long size) {
+ this.compactionTime.inc(time);
+ this.compactionSize.inc(size);
}
/**
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
Tue May 10 23:20:45 2011
@@ -158,7 +158,9 @@ public class TestCompactSelection extend
void compactEquals(List<StoreFile> candidates, boolean forcemajor,
long ... expected)
throws IOException {
- List<StoreFile> actual = store.compactSelection(candidates, forcemajor);
+ store.forceMajor = forcemajor;
+ List<StoreFile> actual = store.compactSelection(candidates);
+ store.forceMajor = false;
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
}
@@ -187,7 +189,7 @@ public class TestCompactSelection extend
*/
// don't exceed max file compact threshold
assertEquals(maxFiles,
- store.compactSelection(sfCreate(7,6,5,4,3,2,1), false).size());
+ store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
/* MAJOR COMPACTION */
// if a major compaction has been forced, then compact everything
@@ -197,8 +199,11 @@ public class TestCompactSelection extend
// even if one of those files is too big
compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
// don't exceed max file compact threshold, even with major compaction
+ store.forceMajor = true;
assertEquals(maxFiles,
- store.compactSelection(sfCreate(7,6,5,4,3,2,1), true).size());
+ store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
+ store.forceMajor = false;
+
// if we exceed maxCompactSize, downgrade to minor
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
// the last file in compaction is the aggregate of all previous compactions
@@ -217,7 +222,7 @@ public class TestCompactSelection extend
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
// reference files should obey max file compact to avoid OOM
assertEquals(maxFiles,
- store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), false).size());
+ store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).size());
// empty case
compactEquals(new ArrayList<StoreFile>() /* empty */);
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Tue May 10 23:20:45 2011
@@ -156,7 +156,7 @@ public class TestStore extends TestCase
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
// after compact; check the lowest time stamp
- store.compact();
+ store.compact(store.requestCompaction());
lowestTimeStampFromStore = Store.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
@@ -688,7 +688,9 @@ public class TestStore extends TestCase
*/
public void testSplitWithEmptyColFam() throws IOException {
init(this.getName());
- assertNull(store.checkSplit(false));
- assertNull(store.checkSplit(true));
+ assertNull(store.checkSplit());
+ store.getHRegion().forceSplit(null);
+ assertNull(store.checkSplit());
+ store.getHRegion().clearSplit_TESTS_ONLY();
}
}