Repository: hbase Updated Branches: refs/heads/master 5368fd5bf -> e804dd0b6
http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index ab9bfc59..c67d7bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -17,31 +17,25 @@ */ package org.apache.hadoop.hbase.util; // this is deliberately not in the o.a.h.h.regionserver package + // in order to make sure all required classes/method are available import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.NavigableSet; import java.util.Optional; -import java.util.OptionalInt; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -53,11 +47,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; @@ -73,7 +68,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -@Category({MiscTests.class, MediumTests.class}) +@Category({ MiscTests.class, MediumTests.class }) @RunWith(Parameterized.class) public class TestCoprocessorScanPolicy { protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -84,8 +79,7 @@ public class TestCoprocessorScanPolicy { @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - ScanObserver.class.getName()); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); TEST_UTIL.startMiniCluster(); } @@ -106,49 +100,58 @@ public class TestCoprocessorScanPolicy { @Test public void testBaseCases() throws Exception { - TableName tableName = - TableName.valueOf("baseCases"); + TableName tableName = TableName.valueOf("baseCases"); if (TEST_UTIL.getAdmin().tableExists(tableName)) { TEST_UTIL.deleteTable(tableName); } - Table t = TEST_UTIL.createTable(tableName, F, 1); - // set the version override to 2 - Put p = new Put(R); - p.setAttribute("versions", new byte[]{}); - p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); - t.put(p); - + Table t = TEST_UTIL.createTable(tableName, F, 10); + // insert 3 versions long now = EnvironmentEdgeManager.currentTime(); - - // insert 2 versions - p = new Put(R); + Put p = new Put(R); p.addColumn(F, Q, now, Q); t.put(p); p = new Put(R); p.addColumn(F, Q, now + 1, Q); t.put(p); + p = new Put(R); + p.addColumn(F, Q, now + 2, Q); + t.put(p); + Get g = new Get(R); - g.setMaxVersions(10); + g.readVersions(10); Result r = t.get(g); + assertEquals(3, r.size()); + + TEST_UTIL.flush(tableName); + TEST_UTIL.compact(tableName, true); + + // still visible after a flush/compaction + r = t.get(g); + assertEquals(3, r.size()); + + // set the version override to 2 + p = new Put(R); + p.setAttribute("versions", new byte[] {}); + p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); + t.put(p); + + // only 2 versions now + r = t.get(g); assertEquals(2, r.size()); TEST_UTIL.flush(tableName); TEST_UTIL.compact(tableName, true); - // both version are still visible even after a flush/compaction - g = new Get(R); - g.setMaxVersions(10); + // still 2 versions after a flush/compaction r = t.get(g); assertEquals(2, r.size()); - // insert a 3rd version - p = new Put(R); - p.addColumn(F, Q, now + 2, Q); + // insert a new version + p.addColumn(F, Q, now + 3, Q); t.put(p); - g = new Get(R); - g.setMaxVersions(10); + + // still 2 versions r = t.get(g); - // still only two version visible assertEquals(2, r.size()); t.close(); @@ -156,41 +159,33 @@ public class TestCoprocessorScanPolicy { @Test public void testTTL() throws Exception { - TableName tableName = - TableName.valueOf("testTTL"); + TableName tableName = TableName.valueOf("testTTL"); if (TEST_UTIL.getAdmin().tableExists(tableName)) { TEST_UTIL.deleteTable(tableName); } - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(F) - .setMaxVersions(10) - .setTimeToLive(1); - desc.addFamily(hcd); - TEST_UTIL.getAdmin().createTable(desc); - Table t = TEST_UTIL.getConnection().getTable(tableName); + Table t = TEST_UTIL.createTable(tableName, F, 10); long now = EnvironmentEdgeManager.currentTime(); ManualEnvironmentEdge me = new ManualEnvironmentEdge(); me.setValue(now); EnvironmentEdgeManagerTestHelper.injectEdge(me); // 2s in the past long ts = now - 2000; - // Set the TTL override to 3s - Put p = new Put(R); - p.setAttribute("ttl", new byte[]{}); - p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); - t.put(p); - p = new Put(R); + Put p = new Put(R); p.addColumn(F, Q, ts, Q); t.put(p); p = new Put(R); p.addColumn(F, Q, ts + 1, Q); t.put(p); - // these two should be expired but for the override - // (their ts was 2s in the past) + // Set the TTL override to 3s + p = new Put(R); + p.setAttribute("ttl", new byte[] {}); + p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); + t.put(p); + // these two should still be there Get g = new Get(R); - g.setMaxVersions(10); + g.readAllVersions(); Result r = t.get(g); // still there? assertEquals(2, r.size()); @@ -199,7 +194,7 @@ public class TestCoprocessorScanPolicy { TEST_UTIL.compact(tableName, true); g = new Get(R); - g.setMaxVersions(10); + g.readAllVersions(); r = t.get(g); // still there? assertEquals(2, r.size()); @@ -208,7 +203,7 @@ public class TestCoprocessorScanPolicy { me.setValue(now + 2000); // now verify that data eventually does expire g = new Get(R); - g.setMaxVersions(10); + g.readAllVersions(); r = t.get(g); // should be gone now assertEquals(0, r.size()); @@ -217,8 +212,8 @@ public class TestCoprocessorScanPolicy { } public static class ScanObserver implements RegionCoprocessor, RegionObserver { - private Map<TableName, Long> ttls = new HashMap<>(); - private Map<TableName, Integer> versions = new HashMap<>(); + private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>(); + private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>(); @Override public Optional<RegionObserver> getRegionObserver() { @@ -231,85 +226,130 @@ public class TestCoprocessorScanPolicy { public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put, final WALEdit edit, final Durability durability) throws IOException { if (put.getAttribute("ttl") != null) { - Cell cell = put.getFamilyCellMap().values().iterator().next().get(0); - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - ttls.put(TableName.valueOf( - Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())), - Bytes.toLong(CellUtil.cloneValue(kv))); + Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); + ttls.put( + TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())), + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); c.bypass(); } else if (put.getAttribute("versions") != null) { - Cell cell = put.getFamilyCellMap().values().iterator().next().get(0); - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - versions.put(TableName.valueOf( - Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())), - Bytes.toInt(CellUtil.cloneValue(kv))); + Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); + versions.put( + TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())), + Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); c.bypass(); } } + private InternalScanner wrap(Store store, InternalScanner scanner) { + Long ttl = this.ttls.get(store.getTableName()); + Integer version = this.versions.get(store.getTableName()); + return new DelegatingInternalScanner(scanner) { + + private byte[] row; + + private byte[] qualifier; + + private int count; + + private Predicate<Cell> checkTtl(long now, long ttl) { + return c -> now - c.getTimestamp() > ttl; + } + + private Predicate<Cell> checkVersion(Cell firstCell, int version) { + if (version == 0) { + return c -> true; + } else { + if (row == null || !CellUtil.matchingRow(firstCell, row)) { + row = CellUtil.cloneRow(firstCell); + // reset qualifier as there is a row change + qualifier = null; + } + return c -> { + if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) { + if (count >= version) { + return true; + } + count++; + return false; + } else { // qualifier switch + qualifier = CellUtil.cloneQualifier(c); + count = 1; + return false; + } + }; + } + } + + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + boolean moreRows = scanner.next(result, scannerContext); + if (result.isEmpty()) { + return moreRows; + } + long now = EnvironmentEdgeManager.currentTime(); + Predicate<Cell> predicate = null; + if (ttl != null) { + predicate = checkTtl(now, ttl); + } + if (version != null) { + Predicate<Cell> vp = checkVersion(result.get(0), version); + if (predicate != null) { + predicate = predicate.and(vp); + } else { + predicate = vp; + } + } + if (predicate != null) { + result.removeIf(predicate); + } + return moreRows; + } + }; + } + @Override - public InternalScanner preFlushScannerOpen( - final ObserverContext<RegionCoprocessorEnvironment> c, Store store, - List<KeyValueScanner> scanners, InternalScanner s, long readPoint) throws IOException { - HStore hs = (HStore) store; - Long newTtl = ttls.get(store.getTableName()); - if (newTtl != null) { - System.out.println("PreFlush:" + newTtl); - } - Integer newVersions = versions.get(store.getTableName()); - ScanInfo oldSI = hs.getScanInfo(); - ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), - family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, - newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), family.isNewVersionBehavior()); - return new StoreScanner(hs, scanInfo, - newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()), - scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner scanner) throws IOException { + return wrap(store, scanner); } @Override - public InternalScanner preCompactScannerOpen( - final ObserverContext<RegionCoprocessorEnvironment> c, Store store, - List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, - InternalScanner s,CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - HStore hs = (HStore) store; - Long newTtl = ttls.get(store.getTableName()); - Integer newVersions = versions.get(store.getTableName()); - ScanInfo oldSI = hs.getScanInfo(); - ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), - family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, - newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), - family.isNewVersionBehavior()); - return new StoreScanner(hs, scanInfo, - newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()), - scanners, scanType, store.getSmallestReadPoint(), earliestPutTs); + public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return wrap(store, scanner); } @Override - public KeyValueScanner preStoreScannerOpen( - final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan, - final NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPt) throws IOException { - TableName tn = store.getTableName(); - if (!tn.isSystemTable()) { - HStore hs = (HStore) store; - Long newTtl = ttls.get(store.getTableName()); - Integer newVersions = versions.get(store.getTableName()); - ScanInfo oldSI = hs.getScanInfo(); - ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), - family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, - newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), - family.isNewVersionBehavior()); - return new StoreScanner(hs, scanInfo, scan, targetCols, readPt); - } else { - return s; + public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, + List<Cell> result) throws IOException { + TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName(); + Long ttl = this.ttls.get(tableName); + if (ttl != null) { + get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax()); + } + Integer version = this.versions.get(tableName); + if (version != null) { + get.readVersions(version); + } + } + + @Override + public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, + RegionScanner s) throws IOException { + Region region = c.getEnvironment().getRegion(); + TableName tableName = region.getTableDescriptor().getTableName(); + Long ttl = this.ttls.get(tableName); + if (ttl != null) { + scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax()); + } + Integer version = this.versions.get(tableName); + if (version != null) { + scan.readVersions(version); } + return region.getScanner(scan); } } }