Author: tedyu
Date: Sat Feb 11 16:29:36 2012
New Revision: 1243086
URL: http://svn.apache.org/viewvc?rev=1243086&view=rev
Log:
HBASE-5199 Differential revision 1311 Delete out of TTL store files before
compaction selection (Liyin)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1243086&r1=1243085&r2=1243086&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Sat Feb 11 16:29:36 2012
@@ -54,9 +54,9 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
@@ -1271,6 +1271,18 @@ public class Store extends SchemaConfigu
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
if (!forcemajor) {
+ // Delete the expired store files before the compaction selection.
+ if (conf.getBoolean("hbase.store.delete.expired.storefile", false)
+ && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
+ CompactSelection expiredSelection = compactSelection
+ .selectExpiredStoreFilesToCompact(
+ EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
+
+ // If there is any expired store files, delete them by compaction.
+ if (expiredSelection != null) {
+ return expiredSelection;
+ }
+ }
// do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0;
@@ -1650,7 +1662,7 @@ public class Store extends SchemaConfigu
/**
* @return the number of files in this store
*/
- public int getNumberOfstorefiles() {
+ public int getNumberOfStoreFiles() {
return this.storefiles.size();
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1243086&r1=1243085&r2=1243086&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Sat Feb 11 16:29:36 2012
@@ -273,7 +273,7 @@ public class StoreFile extends SchemaCon
/**
* @return Path or null if this StoreFile was made with a Stream.
*/
- Path getPath() {
+ public Path getPath() {
return this.path;
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java?rev=1243086&r1=1243085&r2=1243086&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
Sat Feb 11 16:29:36 2012
@@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.regionserver.compactions;
-import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
@@ -82,9 +81,46 @@ public class CompactSelection {
}
/**
- * If the current hour falls in the off peak times and there are no
- * outstanding off peak compactions, the current compaction is
- * promoted to an off peak compaction. Currently only one off peak
+ * Select the expired store files to compact
+ *
+ * @param maxExpiredTimeStamp
+ * The store file will be marked as expired if its max time stamp is
+ * less than this maxExpiredTimeStamp.
+ * @return A CompactSelection contains the expired store files as
+ * filesToCompact
+ */
+ public CompactSelection selectExpiredStoreFilesToCompact(
+ long maxExpiredTimeStamp) {
+ if (filesToCompact == null || filesToCompact.size() == 0)
+ return null;
+ ArrayList<StoreFile> expiredStoreFiles = null;
+ boolean hasExpiredStoreFiles = false;
+ CompactSelection expiredSFSelection = null;
+
+ for (StoreFile storeFile : this.filesToCompact) {
+ if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
+ LOG.info("Deleting the expired store file by compaction: "
+ + storeFile.getPath() + " whose maxTimeStamp is "
+ + storeFile.getReader().getMaxTimestamp()
+ + " while the max expired timestamp is " + maxExpiredTimeStamp);
+ if (!hasExpiredStoreFiles) {
+ expiredStoreFiles = new ArrayList<StoreFile>();
+ hasExpiredStoreFiles = true;
+ }
+ expiredStoreFiles.add(storeFile);
+ }
+ }
+
+ if (hasExpiredStoreFiles) {
+ expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
+ }
+ return expiredSFSelection;
+ }
+
+ /**
+ * If the current hour falls in the off peak times and there are no
+ * outstanding off peak compactions, the current compaction is
+ * promoted to an off peak compaction. Currently only one off peak
* compaction is present in the compaction queue.
*
* @param currentHour
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1243086&r1=1243085&r2=1243086&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Sat Feb 11 16:29:36 2012
@@ -4521,11 +4521,11 @@ public class TestFromClientSide {
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
// compact, net minus two blocks, two hits, no misses
System.out.println("Compacting");
- assertEquals(2, store.getNumberOfstorefiles());
+ assertEquals(2, store.getNumberOfStoreFiles());
store.triggerMajorCompaction();
region.compactStores();
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
- assertEquals(1, store.getNumberOfstorefiles());
+ assertEquals(1, store.getNumberOfStoreFiles());
expectedBlockCount -= 2; // evicted two blocks, cached none
assertEquals(expectedBlockCount, cache.getBlockCount());
expectedBlockHits += 2;
@@ -4546,12 +4546,12 @@ public class TestFromClientSide {
throws InterruptedException {
long start = System.currentTimeMillis();
while (start + timeout > System.currentTimeMillis() &&
- store.getNumberOfstorefiles() != count) {
+ store.getNumberOfStoreFiles() != count) {
Thread.sleep(100);
}
System.out.println("start=" + start + ", now=" +
- System.currentTimeMillis() + ", cur=" + store.getNumberOfstorefiles());
- assertEquals(count, store.getNumberOfstorefiles());
+ System.currentTimeMillis() + ", cur=" + store.getNumberOfStoreFiles());
+ assertEquals(count, store.getNumberOfStoreFiles());
}
@Test
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1243086&r1=1243085&r2=1243086&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Sat Feb 11 16:29:36 2012
@@ -43,10 +43,18 @@ import org.apache.hadoop.fs.FilterFileSy
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@@ -116,14 +124,19 @@ public class TestStore extends TestCase
private void init(String methodName, Configuration conf)
throws IOException {
- //Setting up a Store
- Path basedir = new Path(DIR+methodName);
- Path logdir = new Path(DIR+methodName+"/logs");
- Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(family);
// some of the tests write 4 versions and then flush
// (with HBASE-4241, lower versions are collected on flush)
hcd.setMaxVersions(4);
+ init(methodName, conf, hcd);
+ }
+
+ private void init(String methodName, Configuration conf,
+ HColumnDescriptor hcd) throws IOException {
+ //Setting up a Store
+ Path basedir = new Path(DIR+methodName);
+ Path logdir = new Path(DIR+methodName+"/logs");
+ Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);
@@ -137,6 +150,51 @@ public class TestStore extends TestCase
store = new Store(basedir, region, hcd, fs, conf);
}
+ public void testDeleteExpiredStoreFiles() throws Exception {
+ int storeFileNum = 4;
+ int ttl = 1;
+
+ Configuration conf = HBaseConfiguration.create();
+ // Enable the expired store file deletion
+ conf.setBoolean("hbase.store.delete.expired.storefile", true);
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setTimeToLive(ttl);
+ init(getName(), conf, hcd);
+
+ long sleepTime = this.store.scanInfo.getTtl() / storeFileNum;
+ long timeStamp;
+ // There are 4 store files and the max time stamp difference among these
+ // store files will be (this.store.ttl / storeFileNum)
+ for (int i = 1; i <= storeFileNum; i++) {
+ LOG.info("Adding some data for the store file #" + i);
+ timeStamp = EnvironmentEdgeManager.currentTimeMillis();
+ this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
+ this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
+ this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
+ flush(i);
+ Thread.sleep(sleepTime);
+ }
+
+ // Verify the total number of store files
+ assertEquals(storeFileNum, this.store.getStorefiles().size());
+
+ // Each compaction request will find one expired store file and delete it
+ // by the compaction.
+ for (int i = 1; i <= storeFileNum; i++) {
+ // verify the expired store file.
+ CompactionRequest cr = this.store.requestCompaction();
+ assertEquals(1, cr.getFiles().size());
+ assertTrue(cr.getFiles().get(0).getReader().getMaxTimestamp() <
+ (System.currentTimeMillis() - this.store.scanInfo.getTtl()));
+ // Verify that the expired the store has been deleted.
+ this.store.compact(cr);
+ assertEquals(storeFileNum - i, this.store.getStorefiles().size());
+
+ // Let the next store file expired.
+ Thread.sleep(sleepTime);
+ }
+ }
+
public void testLowestModificationTime() throws Exception {
Configuration conf = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(conf);