http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
index ab9bfc59..c67d7bf 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
@@ -17,31 +17,25 @@
*/
package org.apache.hadoop.hbase.util;
// this is deliberately not in the o.a.h.h.regionserver package
+
// in order to make sure all required classes/method are available
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
import java.util.Optional;
-import java.util.OptionalInt;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -53,11 +47,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@@ -73,7 +68,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-@Category({MiscTests.class, MediumTests.class})
+@Category({ MiscTests.class, MediumTests.class })
@RunWith(Parameterized.class)
public class TestCoprocessorScanPolicy {
protected final static HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
@@ -84,8 +79,7 @@ public class TestCoprocessorScanPolicy {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
- conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- ScanObserver.class.getName());
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
ScanObserver.class.getName());
TEST_UTIL.startMiniCluster();
}
@@ -106,49 +100,58 @@ public class TestCoprocessorScanPolicy {
@Test
public void testBaseCases() throws Exception {
- TableName tableName =
- TableName.valueOf("baseCases");
+ TableName tableName = TableName.valueOf("baseCases");
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
TEST_UTIL.deleteTable(tableName);
}
- Table t = TEST_UTIL.createTable(tableName, F, 1);
- // set the version override to 2
- Put p = new Put(R);
- p.setAttribute("versions", new byte[]{});
- p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
- t.put(p);
-
+ Table t = TEST_UTIL.createTable(tableName, F, 10);
+ // insert 3 versions
long now = EnvironmentEdgeManager.currentTime();
-
- // insert 2 versions
- p = new Put(R);
+ Put p = new Put(R);
p.addColumn(F, Q, now, Q);
t.put(p);
p = new Put(R);
p.addColumn(F, Q, now + 1, Q);
t.put(p);
+ p = new Put(R);
+ p.addColumn(F, Q, now + 2, Q);
+ t.put(p);
+
Get g = new Get(R);
- g.setMaxVersions(10);
+ g.readVersions(10);
Result r = t.get(g);
+ assertEquals(3, r.size());
+
+ TEST_UTIL.flush(tableName);
+ TEST_UTIL.compact(tableName, true);
+
+ // still visible after a flush/compaction
+ r = t.get(g);
+ assertEquals(3, r.size());
+
+ // set the version override to 2
+ p = new Put(R);
+ p.setAttribute("versions", new byte[] {});
+ p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
+ t.put(p);
+
+ // only 2 versions now
+ r = t.get(g);
assertEquals(2, r.size());
TEST_UTIL.flush(tableName);
TEST_UTIL.compact(tableName, true);
- // both version are still visible even after a flush/compaction
- g = new Get(R);
- g.setMaxVersions(10);
+ // still 2 versions after a flush/compaction
r = t.get(g);
assertEquals(2, r.size());
- // insert a 3rd version
- p = new Put(R);
- p.addColumn(F, Q, now + 2, Q);
+ // insert a new version
+ p.addColumn(F, Q, now + 3, Q);
t.put(p);
- g = new Get(R);
- g.setMaxVersions(10);
+
+ // still 2 versions
r = t.get(g);
- // still only two version visible
assertEquals(2, r.size());
t.close();
@@ -156,41 +159,33 @@ public class TestCoprocessorScanPolicy {
@Test
public void testTTL() throws Exception {
- TableName tableName =
- TableName.valueOf("testTTL");
+ TableName tableName = TableName.valueOf("testTTL");
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
TEST_UTIL.deleteTable(tableName);
}
- HTableDescriptor desc = new HTableDescriptor(tableName);
- HColumnDescriptor hcd = new HColumnDescriptor(F)
- .setMaxVersions(10)
- .setTimeToLive(1);
- desc.addFamily(hcd);
- TEST_UTIL.getAdmin().createTable(desc);
- Table t = TEST_UTIL.getConnection().getTable(tableName);
+ Table t = TEST_UTIL.createTable(tableName, F, 10);
long now = EnvironmentEdgeManager.currentTime();
ManualEnvironmentEdge me = new ManualEnvironmentEdge();
me.setValue(now);
EnvironmentEdgeManagerTestHelper.injectEdge(me);
// 2s in the past
long ts = now - 2000;
- // Set the TTL override to 3s
- Put p = new Put(R);
- p.setAttribute("ttl", new byte[]{});
- p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
- t.put(p);
- p = new Put(R);
+ Put p = new Put(R);
p.addColumn(F, Q, ts, Q);
t.put(p);
p = new Put(R);
p.addColumn(F, Q, ts + 1, Q);
t.put(p);
- // these two should be expired but for the override
- // (their ts was 2s in the past)
+ // Set the TTL override to 3s
+ p = new Put(R);
+ p.setAttribute("ttl", new byte[] {});
+ p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
+ t.put(p);
+ // these two should still be there
Get g = new Get(R);
- g.setMaxVersions(10);
+ g.readAllVersions();
Result r = t.get(g);
// still there?
assertEquals(2, r.size());
@@ -199,7 +194,7 @@ public class TestCoprocessorScanPolicy {
TEST_UTIL.compact(tableName, true);
g = new Get(R);
- g.setMaxVersions(10);
+ g.readAllVersions();
r = t.get(g);
// still there?
assertEquals(2, r.size());
@@ -208,7 +203,7 @@ public class TestCoprocessorScanPolicy {
me.setValue(now + 2000);
// now verify that data eventually does expire
g = new Get(R);
- g.setMaxVersions(10);
+ g.readAllVersions();
r = t.get(g);
// should be gone now
assertEquals(0, r.size());
@@ -217,8 +212,8 @@ public class TestCoprocessorScanPolicy {
}
public static class ScanObserver implements RegionCoprocessor,
RegionObserver {
- private Map<TableName, Long> ttls = new HashMap<>();
- private Map<TableName, Integer> versions = new HashMap<>();
+ private final ConcurrentMap<TableName, Long> ttls = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<TableName, Integer> versions = new
ConcurrentHashMap<>();
@Override
public Optional<RegionObserver> getRegionObserver() {
@@ -231,85 +226,130 @@ public class TestCoprocessorScanPolicy {
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put,
final WALEdit edit, final Durability durability) throws IOException {
if (put.getAttribute("ttl") != null) {
- Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- ttls.put(TableName.valueOf(
- Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength())),
- Bytes.toLong(CellUtil.cloneValue(kv)));
+ Cell cell =
put.getFamilyCellMap().values().stream().findFirst().get().get(0);
+ ttls.put(
+ TableName.valueOf(Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(),
+ cell.getQualifierLength())),
+ Bytes.toLong(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength()));
c.bypass();
} else if (put.getAttribute("versions") != null) {
- Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- versions.put(TableName.valueOf(
- Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength())),
- Bytes.toInt(CellUtil.cloneValue(kv)));
+ Cell cell =
put.getFamilyCellMap().values().stream().findFirst().get().get(0);
+ versions.put(
+ TableName.valueOf(Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(),
+ cell.getQualifierLength())),
+ Bytes.toInt(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength()));
c.bypass();
}
}
+ private InternalScanner wrap(Store store, InternalScanner scanner) {
+ Long ttl = this.ttls.get(store.getTableName());
+ Integer version = this.versions.get(store.getTableName());
+ return new DelegatingInternalScanner(scanner) {
+
+ private byte[] row;
+
+ private byte[] qualifier;
+
+ private int count;
+
+ private Predicate<Cell> checkTtl(long now, long ttl) {
+ return c -> now - c.getTimestamp() > ttl;
+ }
+
+ private Predicate<Cell> checkVersion(Cell firstCell, int version) {
+ if (version == 0) {
+ return c -> true;
+ } else {
+ if (row == null || !CellUtil.matchingRow(firstCell, row)) {
+ row = CellUtil.cloneRow(firstCell);
+ // reset qualifier as there is a row change
+ qualifier = null;
+ }
+ return c -> {
+ if (qualifier != null && CellUtil.matchingQualifier(c,
qualifier)) {
+ if (count >= version) {
+ return true;
+ }
+ count++;
+ return false;
+ } else { // qualifier switch
+ qualifier = CellUtil.cloneQualifier(c);
+ count = 1;
+ return false;
+ }
+ };
+ }
+ }
+
+ @Override
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ boolean moreRows = scanner.next(result, scannerContext);
+ if (result.isEmpty()) {
+ return moreRows;
+ }
+ long now = EnvironmentEdgeManager.currentTime();
+ Predicate<Cell> predicate = null;
+ if (ttl != null) {
+ predicate = checkTtl(now, ttl);
+ }
+ if (version != null) {
+ Predicate<Cell> vp = checkVersion(result.get(0), version);
+ if (predicate != null) {
+ predicate = predicate.and(vp);
+ } else {
+ predicate = vp;
+ }
+ }
+ if (predicate != null) {
+ result.removeIf(predicate);
+ }
+ return moreRows;
+ }
+ };
+ }
+
@Override
- public InternalScanner preFlushScannerOpen(
- final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- List<KeyValueScanner> scanners, InternalScanner s, long readPoint)
throws IOException {
- HStore hs = (HStore) store;
- Long newTtl = ttls.get(store.getTableName());
- if (newTtl != null) {
- System.out.println("PreFlush:" + newTtl);
- }
- Integer newVersions = versions.get(store.getTableName());
- ScanInfo oldSI = hs.getScanInfo();
- ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
- ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(),
- family.getMinVersions(), newVersions == null ?
family.getMaxVersions() : newVersions,
- newTtl == null ? oldSI.getTtl() : newTtl,
family.getKeepDeletedCells(),
- family.getBlocksize(), oldSI.getTimeToPurgeDeletes(),
oldSI.getComparator(), family.isNewVersionBehavior());
- return new StoreScanner(hs, scanInfo,
- newVersions == null ? OptionalInt.empty() :
OptionalInt.of(newVersions.intValue()),
- scanners, ScanType.COMPACT_RETAIN_DELETES,
store.getSmallestReadPoint(),
- HConstants.OLDEST_TIMESTAMP);
+ public InternalScanner
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ InternalScanner scanner) throws IOException {
+ return wrap(store, scanner);
}
@Override
- public InternalScanner preCompactScannerOpen(
- final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- List<? extends KeyValueScanner> scanners, ScanType scanType, long
earliestPutTs,
- InternalScanner s,CompactionLifeCycleTracker tracker,
CompactionRequest request,
- long readPoint) throws IOException {
- HStore hs = (HStore) store;
- Long newTtl = ttls.get(store.getTableName());
- Integer newVersions = versions.get(store.getTableName());
- ScanInfo oldSI = hs.getScanInfo();
- ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
- ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(),
- family.getMinVersions(), newVersions == null ?
family.getMaxVersions() : newVersions,
- newTtl == null ? oldSI.getTtl() : newTtl,
family.getKeepDeletedCells(),
- family.getBlocksize(), oldSI.getTimeToPurgeDeletes(),
oldSI.getComparator(),
- family.isNewVersionBehavior());
- return new StoreScanner(hs, scanInfo,
- newVersions == null ? OptionalInt.empty() :
OptionalInt.of(newVersions.intValue()),
- scanners, scanType, store.getSmallestReadPoint(), earliestPutTs);
+ public InternalScanner
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker
tracker,
+ CompactionRequest request) throws IOException {
+ return wrap(store, scanner);
}
@Override
- public KeyValueScanner preStoreScannerOpen(
- final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
final Scan scan,
- final NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPt)
throws IOException {
- TableName tn = store.getTableName();
- if (!tn.isSystemTable()) {
- HStore hs = (HStore) store;
- Long newTtl = ttls.get(store.getTableName());
- Integer newVersions = versions.get(store.getTableName());
- ScanInfo oldSI = hs.getScanInfo();
- ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
- ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(),
- family.getMinVersions(), newVersions == null ?
family.getMaxVersions() : newVersions,
- newTtl == null ? oldSI.getTtl() : newTtl,
family.getKeepDeletedCells(),
- family.getBlocksize(), oldSI.getTimeToPurgeDeletes(),
oldSI.getComparator(),
- family.isNewVersionBehavior());
- return new StoreScanner(hs, scanInfo, scan, targetCols, readPt);
- } else {
- return s;
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get
get,
+ List<Cell> result) throws IOException {
+ TableName tableName =
c.getEnvironment().getRegion().getTableDescriptor().getTableName();
+ Long ttl = this.ttls.get(tableName);
+ if (ttl != null) {
+ get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl,
get.getTimeRange().getMax());
+ }
+ Integer version = this.versions.get(tableName);
+ if (version != null) {
+ get.readVersions(version);
+ }
+ }
+
+ @Override
+ public RegionScanner
preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
+ RegionScanner s) throws IOException {
+ Region region = c.getEnvironment().getRegion();
+ TableName tableName = region.getTableDescriptor().getTableName();
+ Long ttl = this.ttls.get(tableName);
+ if (ttl != null) {
+ scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl,
scan.getTimeRange().getMax());
+ }
+ Integer version = this.versions.get(tableName);
+ if (version != null) {
+ scan.readVersions(version);
}
+ return region.getScanner(scan);
}
}
}