Repository: hbase
Updated Branches:
  refs/heads/master 5368fd5bf -> e804dd0b6


http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
index ab9bfc59..c67d7bf 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
@@ -17,31 +17,25 @@
  */
 package org.apache.hadoop.hbase.util;
 // this is deliberately not in the o.a.h.h.regionserver package
+
 // in order to make sure all required classes/method are available
 
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Optional;
-import java.util.OptionalInt;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -53,11 +47,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@@ -73,7 +68,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-@Category({MiscTests.class, MediumTests.class})
+@Category({ MiscTests.class, MediumTests.class })
 @RunWith(Parameterized.class)
 public class TestCoprocessorScanPolicy {
   protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
@@ -84,8 +79,7 @@ public class TestCoprocessorScanPolicy {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
-        ScanObserver.class.getName());
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 
ScanObserver.class.getName());
     TEST_UTIL.startMiniCluster();
   }
 
@@ -106,49 +100,58 @@ public class TestCoprocessorScanPolicy {
 
   @Test
   public void testBaseCases() throws Exception {
-    TableName tableName =
-        TableName.valueOf("baseCases");
+    TableName tableName = TableName.valueOf("baseCases");
     if (TEST_UTIL.getAdmin().tableExists(tableName)) {
       TEST_UTIL.deleteTable(tableName);
     }
-    Table t = TEST_UTIL.createTable(tableName, F, 1);
-    // set the version override to 2
-    Put p = new Put(R);
-    p.setAttribute("versions", new byte[]{});
-    p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
-    t.put(p);
-
+    Table t = TEST_UTIL.createTable(tableName, F, 10);
+    // insert 3 versions
     long now = EnvironmentEdgeManager.currentTime();
-
-    // insert 2 versions
-    p = new Put(R);
+    Put p = new Put(R);
     p.addColumn(F, Q, now, Q);
     t.put(p);
     p = new Put(R);
     p.addColumn(F, Q, now + 1, Q);
     t.put(p);
+    p = new Put(R);
+    p.addColumn(F, Q, now + 2, Q);
+    t.put(p);
+
     Get g = new Get(R);
-    g.setMaxVersions(10);
+    g.readVersions(10);
     Result r = t.get(g);
+    assertEquals(3, r.size());
+
+    TEST_UTIL.flush(tableName);
+    TEST_UTIL.compact(tableName, true);
+
+    // still visible after a flush/compaction
+    r = t.get(g);
+    assertEquals(3, r.size());
+
+    // set the version override to 2
+    p = new Put(R);
+    p.setAttribute("versions", new byte[] {});
+    p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
+    t.put(p);
+
+    // only 2 versions now
+    r = t.get(g);
     assertEquals(2, r.size());
 
     TEST_UTIL.flush(tableName);
     TEST_UTIL.compact(tableName, true);
 
-    // both version are still visible even after a flush/compaction
-    g = new Get(R);
-    g.setMaxVersions(10);
+    // still 2 versions after a flush/compaction
     r = t.get(g);
     assertEquals(2, r.size());
 
-    // insert a 3rd version
-    p = new Put(R);
-    p.addColumn(F, Q, now + 2, Q);
+    // insert a new version
+    p.addColumn(F, Q, now + 3, Q);
     t.put(p);
-    g = new Get(R);
-    g.setMaxVersions(10);
+
+    // still 2 versions
     r = t.get(g);
-    // still only two version visible
     assertEquals(2, r.size());
 
     t.close();
@@ -156,41 +159,33 @@ public class TestCoprocessorScanPolicy {
 
   @Test
   public void testTTL() throws Exception {
-    TableName tableName =
-        TableName.valueOf("testTTL");
+    TableName tableName = TableName.valueOf("testTTL");
     if (TEST_UTIL.getAdmin().tableExists(tableName)) {
       TEST_UTIL.deleteTable(tableName);
     }
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    HColumnDescriptor hcd = new HColumnDescriptor(F)
-    .setMaxVersions(10)
-    .setTimeToLive(1);
-    desc.addFamily(hcd);
-    TEST_UTIL.getAdmin().createTable(desc);
-    Table t = TEST_UTIL.getConnection().getTable(tableName);
+    Table t = TEST_UTIL.createTable(tableName, F, 10);
     long now = EnvironmentEdgeManager.currentTime();
     ManualEnvironmentEdge me = new ManualEnvironmentEdge();
     me.setValue(now);
     EnvironmentEdgeManagerTestHelper.injectEdge(me);
     // 2s in the past
     long ts = now - 2000;
-    // Set the TTL override to 3s
-    Put p = new Put(R);
-    p.setAttribute("ttl", new byte[]{});
-    p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
-    t.put(p);
 
-    p = new Put(R);
+    Put p = new Put(R);
     p.addColumn(F, Q, ts, Q);
     t.put(p);
     p = new Put(R);
     p.addColumn(F, Q, ts + 1, Q);
     t.put(p);
 
-    // these two should be expired but for the override
-    // (their ts was 2s in the past)
+    // Set the TTL override to 3s
+    p = new Put(R);
+    p.setAttribute("ttl", new byte[] {});
+    p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
+    t.put(p);
+    // these two should still be there
     Get g = new Get(R);
-    g.setMaxVersions(10);
+    g.readAllVersions();
     Result r = t.get(g);
     // still there?
     assertEquals(2, r.size());
@@ -199,7 +194,7 @@ public class TestCoprocessorScanPolicy {
     TEST_UTIL.compact(tableName, true);
 
     g = new Get(R);
-    g.setMaxVersions(10);
+    g.readAllVersions();
     r = t.get(g);
     // still there?
     assertEquals(2, r.size());
@@ -208,7 +203,7 @@ public class TestCoprocessorScanPolicy {
     me.setValue(now + 2000);
     // now verify that data eventually does expire
     g = new Get(R);
-    g.setMaxVersions(10);
+    g.readAllVersions();
     r = t.get(g);
     // should be gone now
     assertEquals(0, r.size());
@@ -217,8 +212,8 @@ public class TestCoprocessorScanPolicy {
   }
 
   public static class ScanObserver implements RegionCoprocessor, 
RegionObserver {
-    private Map<TableName, Long> ttls = new HashMap<>();
-    private Map<TableName, Integer> versions = new HashMap<>();
+    private final ConcurrentMap<TableName, Long> ttls = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<TableName, Integer> versions = new 
ConcurrentHashMap<>();
 
     @Override
     public Optional<RegionObserver> getRegionObserver() {
@@ -231,85 +226,130 @@ public class TestCoprocessorScanPolicy {
     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, 
final Put put,
         final WALEdit edit, final Durability durability) throws IOException {
       if (put.getAttribute("ttl") != null) {
-        Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
-        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-        ttls.put(TableName.valueOf(
-          Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), 
kv.getQualifierLength())),
-          Bytes.toLong(CellUtil.cloneValue(kv)));
+        Cell cell = 
put.getFamilyCellMap().values().stream().findFirst().get().get(0);
+        ttls.put(
+          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+            cell.getQualifierLength())),
+          Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength()));
         c.bypass();
       } else if (put.getAttribute("versions") != null) {
-        Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
-        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-        versions.put(TableName.valueOf(
-          Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), 
kv.getQualifierLength())),
-          Bytes.toInt(CellUtil.cloneValue(kv)));
+        Cell cell = 
put.getFamilyCellMap().values().stream().findFirst().get().get(0);
+        versions.put(
+          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+            cell.getQualifierLength())),
+          Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength()));
         c.bypass();
       }
     }
 
+    private InternalScanner wrap(Store store, InternalScanner scanner) {
+      Long ttl = this.ttls.get(store.getTableName());
+      Integer version = this.versions.get(store.getTableName());
+      return new DelegatingInternalScanner(scanner) {
+
+        private byte[] row;
+
+        private byte[] qualifier;
+
+        private int count;
+
+        private Predicate<Cell> checkTtl(long now, long ttl) {
+          return c -> now - c.getTimestamp() > ttl;
+        }
+
+        private Predicate<Cell> checkVersion(Cell firstCell, int version) {
+          if (version == 0) {
+            return c -> true;
+          } else {
+            if (row == null || !CellUtil.matchingRow(firstCell, row)) {
+              row = CellUtil.cloneRow(firstCell);
+              // reset qualifier as there is a row change
+              qualifier = null;
+            }
+            return c -> {
+              if (qualifier != null && CellUtil.matchingQualifier(c, 
qualifier)) {
+                if (count >= version) {
+                  return true;
+                }
+                count++;
+                return false;
+              } else { // qualifier switch
+                qualifier = CellUtil.cloneQualifier(c);
+                count = 1;
+                return false;
+              }
+            };
+          }
+        }
+
+        @Override
+        public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+          boolean moreRows = scanner.next(result, scannerContext);
+          if (result.isEmpty()) {
+            return moreRows;
+          }
+          long now = EnvironmentEdgeManager.currentTime();
+          Predicate<Cell> predicate = null;
+          if (ttl != null) {
+            predicate = checkTtl(now, ttl);
+          }
+          if (version != null) {
+            Predicate<Cell> vp = checkVersion(result.get(0), version);
+            if (predicate != null) {
+              predicate = predicate.and(vp);
+            } else {
+              predicate = vp;
+            }
+          }
+          if (predicate != null) {
+            result.removeIf(predicate);
+          }
+          return moreRows;
+        }
+      };
+    }
+
     @Override
-    public InternalScanner preFlushScannerOpen(
-        final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-        List<KeyValueScanner> scanners, InternalScanner s, long readPoint) 
throws IOException {
-      HStore hs = (HStore) store;
-      Long newTtl = ttls.get(store.getTableName());
-      if (newTtl != null) {
-        System.out.println("PreFlush:" + newTtl);
-      }
-      Integer newVersions = versions.get(store.getTableName());
-      ScanInfo oldSI = hs.getScanInfo();
-      ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
-      ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), 
family.getName(),
-          family.getMinVersions(), newVersions == null ? 
family.getMaxVersions() : newVersions,
-          newTtl == null ? oldSI.getTtl() : newTtl, 
family.getKeepDeletedCells(),
-          family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), 
oldSI.getComparator(), family.isNewVersionBehavior());
-      return new StoreScanner(hs, scanInfo,
-          newVersions == null ? OptionalInt.empty() : 
OptionalInt.of(newVersions.intValue()),
-          scanners, ScanType.COMPACT_RETAIN_DELETES, 
store.getSmallestReadPoint(),
-          HConstants.OLDEST_TIMESTAMP);
+    public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        InternalScanner scanner) throws IOException {
+      return wrap(store, scanner);
     }
 
     @Override
-    public InternalScanner preCompactScannerOpen(
-        final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-        List<? extends KeyValueScanner> scanners, ScanType scanType, long 
earliestPutTs,
-        InternalScanner s,CompactionLifeCycleTracker tracker, 
CompactionRequest request,
-        long readPoint) throws IOException {
-      HStore hs = (HStore) store;
-      Long newTtl = ttls.get(store.getTableName());
-      Integer newVersions = versions.get(store.getTableName());
-      ScanInfo oldSI = hs.getScanInfo();
-      ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
-      ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), 
family.getName(),
-          family.getMinVersions(), newVersions == null ? 
family.getMaxVersions() : newVersions,
-          newTtl == null ? oldSI.getTtl() : newTtl, 
family.getKeepDeletedCells(),
-          family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), 
oldSI.getComparator(),
-          family.isNewVersionBehavior());
-      return new StoreScanner(hs, scanInfo,
-          newVersions == null ? OptionalInt.empty() : 
OptionalInt.of(newVersions.intValue()),
-          scanners, scanType, store.getSmallestReadPoint(), earliestPutTs);
+    public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker 
tracker,
+        CompactionRequest request) throws IOException {
+      return wrap(store, scanner);
     }
 
     @Override
-    public KeyValueScanner preStoreScannerOpen(
-        final ObserverContext<RegionCoprocessorEnvironment> c, Store store, 
final Scan scan,
-        final NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPt) 
throws IOException {
-      TableName tn = store.getTableName();
-      if (!tn.isSystemTable()) {
-        HStore hs = (HStore) store;
-        Long newTtl = ttls.get(store.getTableName());
-        Integer newVersions = versions.get(store.getTableName());
-        ScanInfo oldSI = hs.getScanInfo();
-        ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
-        ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), 
family.getName(),
-            family.getMinVersions(), newVersions == null ? 
family.getMaxVersions() : newVersions,
-            newTtl == null ? oldSI.getTtl() : newTtl, 
family.getKeepDeletedCells(),
-            family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), 
oldSI.getComparator(),
-            family.isNewVersionBehavior());
-        return new StoreScanner(hs, scanInfo, scan, targetCols, readPt);
-      } else {
-        return s;
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get 
get,
+        List<Cell> result) throws IOException {
+      TableName tableName = 
c.getEnvironment().getRegion().getTableDescriptor().getTableName();
+      Long ttl = this.ttls.get(tableName);
+      if (ttl != null) {
+        get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, 
get.getTimeRange().getMax());
+      }
+      Integer version = this.versions.get(tableName);
+      if (version != null) {
+        get.readVersions(version);
+      }
+    }
+
+    @Override
+    public RegionScanner 
preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
+        RegionScanner s) throws IOException {
+      Region region = c.getEnvironment().getRegion();
+      TableName tableName = region.getTableDescriptor().getTableName();
+      Long ttl = this.ttls.get(tableName);
+      if (ttl != null) {
+        scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, 
scan.getTimeRange().getMax());
+      }
+      Integer version = this.versions.get(tableName);
+      if (version != null) {
+        scan.readVersions(version);
       }
+      return region.getScanner(scan);
     }
   }
 }

Reply via email to