This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new cabab87  PHOENIX-5881 - Port PHOENIX-5645 (MaxLookbackAge) to 5.x
cabab87 is described below

commit cabab87e6c18db5eb2b69c24885c28806dc404e7
Author: Geoffrey Jacoby <[email protected]>
AuthorDate: Thu Aug 6 14:56:49 2020 -0700

    PHOENIX-5881 - Port PHOENIX-5645 (MaxLookbackAge) to 5.x
---
 .../end2end/IndexToolForNonTxGlobalIndexIT.java    |  10 +-
 .../org/apache/phoenix/end2end/MaxLookbackIT.java  | 423 +++++++++++++++++++++
 .../apache/phoenix/end2end/PointInTimeQueryIT.java |   4 +-
 .../it/java/org/apache/phoenix/end2end/SCNIT.java  | 129 +++++++
 .../org/apache/phoenix/compile/QueryCompiler.java  |  28 ++
 .../coprocessor/BaseScannerRegionObserver.java     |  23 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   2 +
 .../compat/hbase/HbaseCompatCapabilities.java      |  32 ++
 .../CompatBaseScannerRegionObserver.java           |  80 ++++
 .../compat/hbase/HbaseCompatCapabilities.java      |  32 ++
 .../CompatBaseScannerRegionObserver.java           |  80 ++++
 .../compat/hbase/HbaseCompatCapabilities.java      |  31 ++
 .../CompatBaseScannerRegionObserver.java           | 197 ++++++++++
 13 files changed, 1065 insertions(+), 6 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index e32d310..53ee745 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -1023,11 +1024,14 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseUniqueNamesOwnClusterIT
                 customEdge.incrementValue(waitForUpsert);
                 return;
             }
+            //In HBase 2.0-2.2, we can't see Puts behind Deletes even on 
lookback / SCN queries. Starting in 2.3 we can
+            //That changes the counts we expect from index tool verification
+            int putBehindDeleteMarkerCount = 
HbaseCompatCapabilities.isLookbackBeyondDeletesSupported() ? 1 :0;
 
             // regular job without delete row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, 
dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", 
String.valueOf(t0),"-et", String.valueOf(t4));
-            verifyCounters(it, 2, 2);
+            verifyCounters(it, 2, 2 + putBehindDeleteMarkerCount);
             customEdge.incrementValue(waitForUpsert);
 
             // job with 2 rows
@@ -1039,13 +1043,13 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseUniqueNamesOwnClusterIT
             // job with update on only one row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, 
dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", 
String.valueOf(t1),"-et", String.valueOf(t3));
-            verifyCounters(it, 1, 1);
+            verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount);
             customEdge.incrementValue(waitForUpsert);
 
             // job with update on only one row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, 
dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", 
String.valueOf(t2),"-et", String.valueOf(t4));
-            verifyCounters(it, 1, 1);
+            verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount);
             customEdge.incrementValue(waitForUpsert);
 
             // job with update on only one row
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
new file mode 100644
index 0000000..957a7a4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import 
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.AfterClass;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.assertRawCellCount;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
+import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+
+@NeedsOwnMiniClusterTest
+public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
+    private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class);
+    private static final int MAX_LOOKBACK_AGE = 15;
+    private static final int ROWS_POPULATED = 2;
+    public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
+    private String tableDDLOptions;
+    private StringBuilder optionBuilder;
+    ManualEnvironmentEdge injectEdge;
+    private int ttl;
+    //max lookback isn't supported in HBase 2.1 and 2.2 because of missing 
coprocessor
+    // interfaces; see HBASE-24321
+    private static boolean isMaxLookbackSupported =
+            HbaseCompatCapabilities.isMaxLookbackTimeSupported();
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        if (!isMaxLookbackSupported) {
+            return;
+        }
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+        
props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(MAX_LOOKBACK_AGE));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void beforeTest(){
+        if (!isMaxLookbackSupported) {
+            return;
+        }
+        EnvironmentEdgeManager.reset();
+        optionBuilder = new StringBuilder();
+        this.tableDDLOptions = optionBuilder.toString();
+        ttl = 0;
+        injectEdge = new ManualEnvironmentEdge();
+        injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+    }
+
+    @After
+    public synchronized void afterClass() {
+        if (!isMaxLookbackSupported) {
+            return;
+        }
+        EnvironmentEdgeManager.reset();
+    }
+
+    @Test
+    public void testTooLowSCNWithMaxLookbackAge() throws Exception {
+        if (!isMaxLookbackSupported) {
+            return;
+        }
+        String dataTableName = generateUniqueName();
+        createTable(dataTableName);
+        injectEdge.setValue(System.currentTimeMillis());
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+        //increase long enough to make sure we can find the syscat row for the 
table
+        injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+        populateTable(dataTableName);
+        long populateTime = EnvironmentEdgeManager.currentTimeMillis();
+        injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+            Long.toString(populateTime));
+        try (Connection connscn = DriverManager.getConnection(getUrl(), 
props)) {
+            connscn.createStatement().executeQuery("select * from " + 
dataTableName);
+        } catch (SQLException se) {
+            SQLExceptionCode code =
+                
SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
+            TestUtil.assertSqlExceptionCode(code, se);
+            return;
+        }
+        Assert.fail("We should have thrown an exception for the too-early 
SCN");
+    }
+
+    @Test(timeout=120000L)
+    public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
+        if (!isMaxLookbackSupported) {
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+
+            TableName dataTable = TableName.valueOf(dataTableName);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, 1);
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            TableName indexTable = TableName.valueOf(indexName);
+            injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+            long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(10); //make sure we delete at a 
different ts
+            Statement stmt = conn.createStatement();
+            stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 
'a'");
+            Assert.assertEquals(1, stmt.getUpdateCount());
+            conn.commit();
+            //select stmt to get row we deleted
+            String sql = String.format("SELECT * FROM %s WHERE id = 'a'", 
dataTableName);
+            String indexSql = String.format("SELECT * FROM %s WHERE val1 = 
'ab'", dataTableName);
+            int rowsPlusDeleteMarker = ROWS_POPULATED;
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+            flush(dataTable);
+            flush(indexTable);
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+            long beforeFirstCompactSCN = 
EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(1); //new ts for major compaction
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+            assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+            //wait for the lookback time. After this compactions should purge 
the deleted row
+            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            long beforeSecondCompactSCN = 
EnvironmentEdgeManager.currentTimeMillis();
+            String notDeletedRowSql =
+                String.format("SELECT * FROM %s WHERE id = 'b'", 
dataTableName);
+            String notDeletedIndexRowSql =
+                String.format("SELECT * FROM %s WHERE val1 = 'bc'", 
dataTableName);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, 
beforeSecondCompactSCN, true);
+            assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, 
beforeSecondCompactSCN, true);
+            assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+            conn.createStatement().execute("upsert into " + dataTableName +
+                " values ('c', 'cd', 'cde', 'cdef')");
+            conn.commit();
+            injectEdge.incrementValue(1L);
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //should still be ROWS_POPULATED because we added one and deleted 
one
+            assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+
+            //deleted row should be gone, but not deleted row should still be 
there.
+            assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN, 
false);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, 
beforeSecondCompactSCN, true);
+            assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, 
beforeSecondCompactSCN, true);
+
+        }
+    }
+
+    @Test(timeout=60000L)
+    public void testTTLAndMaxLookbackAge() throws Exception {
+        if (!isMaxLookbackSupported) {
+            return;
+        }
+        ttl = 20;
+        optionBuilder.append("TTL=" + ttl);
+        tableDDLOptions = optionBuilder.toString();
+        Configuration conf = getUtility().getConfiguration();
+        //disable automatic memstore flushes
+        long oldMemstoreFlushInterval = 
conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+            HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+        conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, 1);
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.incrementValue(1);
+            long afterFirstInsertSCN = 
EnvironmentEdgeManager.currentTimeMillis();
+            TableName dataTable = TableName.valueOf(dataTableName);
+            TableName indexTable = TableName.valueOf(indexName);
+            assertTableHasTtl(conn, dataTable, ttl);
+            assertTableHasTtl(conn, indexTable, ttl);
+            //first make sure we inserted correctly
+            String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", 
dataTableName);
+            String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 
'ab'", dataTableName);
+            assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true);
+            int originalRowCount = 2;
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //force a flush
+            flush(dataTable);
+            flush(indexTable);
+            //flush shouldn't have changed it
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) -
+                (EnvironmentEdgeManager.currentTimeMillis() - 
afterFirstInsertSCN);
+            if (timeToAdvance > 0) {
+                injectEdge.incrementValue(timeToAdvance);
+            }
+            //make sure it's still on disk
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            injectEdge.incrementValue(1); //get a new timestamp for compaction
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //nothing should have been purged by this major compaction
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //now wait the TTL
+            timeToAdvance = (ttl * 1000) -
+                (EnvironmentEdgeManager.currentTimeMillis() - 
afterFirstInsertSCN);
+            if (timeToAdvance > 0) {
+                injectEdge.incrementValue(timeToAdvance);
+            }
+            //make sure that we can compact away the now-expired rows
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //note that before HBase 1.4, we don't have HBASE-17956
+            // and this will always return 0 whether it's still on-disk or not
+            assertRawRowCount(conn, dataTable, 0);
+            assertRawRowCount(conn, indexTable, 0);
+        } finally{
+            conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 
oldMemstoreFlushInterval);
+        }
+    }
+
+    @Test(timeout=60000)
+    public void testRecentMaxVersionsNotCompactedAway() throws Exception {
+        if (!isMaxLookbackSupported) {
+            return;
+        }
+        int versions = 2;
+        optionBuilder.append("VERSIONS=" + versions);
+        tableDDLOptions = optionBuilder.toString();
+        String firstValue = "abc";
+        String secondValue = "def";
+        String thirdValue = "ghi";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, versions);
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            //increment to make sure we don't "look back" past table creation
+            injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+            injectEdge.incrementValue(1); //increment by 1 so we can see our 
write
+            long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+            //make sure table and index metadata is set up right for versions
+            TableName dataTable = TableName.valueOf(dataTableName);
+            TableName indexTable = TableName.valueOf(indexName);
+            assertTableHasVersions(conn, dataTable, versions);
+            assertTableHasVersions(conn, indexTable, versions);
+            //check query optimizer is doing what we expect
+            String dataTableSelectSql =
+                String.format("SELECT val2 FROM %s WHERE id = 'a'", 
dataTableName);
+            String indexTableSelectSql =
+                String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", 
dataTableName);
+            assertExplainPlan(conn, indexTableSelectSql, dataTableName, 
indexName);
+            //make sure the data was inserted correctly in the first place
+            assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, 
afterInsertSCN, firstValue);
+            assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql, 
afterInsertSCN, firstValue);
+            //force first update to get a distinct ts
+            injectEdge.incrementValue(1);
+            updateColumn(conn, dataTableName, "id", "a", "val2", secondValue);
+            injectEdge.incrementValue(1); //now make update visible
+            long afterFirstUpdateSCN = 
EnvironmentEdgeManager.currentTimeMillis();
+            //force second update to get a distinct ts
+            injectEdge.incrementValue(1);
+            updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue);
+            injectEdge.incrementValue(1);
+            long afterSecondUpdateSCN = 
EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(1);
+            //check to make sure we can see all three versions at the 
appropriate times
+            String[] allValues = {firstValue, secondValue, thirdValue};
+            long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, 
afterSecondUpdateSCN};
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, 
allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, 
allSCNs);
+            flush(dataTable);
+            flush(indexTable);
+            //after flush, check to make sure we can see all three versions at 
the appropriate times
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, 
allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, 
allSCNs);
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //after major compaction, check to make sure we can see all three 
versions
+            // at the appropriate times
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, 
allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, 
allSCNs);
+            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            long afterLookbackAgeSCN = 
EnvironmentEdgeManager.currentTimeMillis();
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //empty column, 1 version of val 1, 3 versions of val2, 1 version 
of val3 = 6
+            assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
+            //2 versions of empty column, 2 versions of val2,
+            // 2 versions of val3 (since we write whole rows to index) = 6
+            assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 
6);
+            //empty column + 1 version each of val1,2 and 3 = 4
+            assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
+            //1 version of empty column, 1 version of val2, 1 version of val3 
= 3
+            assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 
3);
+        }
+    }
+
+    private void flush(TableName table) throws IOException {
+        Admin admin = getUtility().getHBaseAdmin();
+        admin.flush(table);
+    }
+
+    private void majorCompact(TableName table) throws Exception {
+        TestUtil.majorCompact(getUtility(), table);
+    }
+
+    private void assertMultiVersionLookbacks(String dataTableSelectSql,
+                                             String[] values, long[] scns)
+        throws Exception {
+        //make sure we can still look back after updating
+        for (int k = 0; k < values.length; k++){
+            assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, 
scns[k], values[k]);
+        }
+    }
+
+    private void updateColumn(Connection conn, String dataTableName,
+                              String idColumn, String id, String valueColumn, 
String value)
+        throws SQLException {
+        String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES 
('%s', '%s')",
+            dataTableName, idColumn, valueColumn, id, value);
+        conn.createStatement().execute(upsertSql);
+        conn.commit();
+    }
+
+    private void createTable(String tableName) throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String createSql = "create table " + tableName +
+                " (id varchar(10) not null primary key, val1 varchar(10), " +
+                "val2 varchar(10), val3 varchar(10))" + tableDDLOptions;
+            conn.createStatement().execute(createSql);
+            conn.commit();
+        }
+    }
+    private void populateTable(String tableName) throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("upsert into " + tableName + " 
values ('a', 'ab', 'abc', 'abcd')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + tableName + " 
values ('b', 'bc', 'bcd', 'bcde')");
+            conn.commit();
+        }
+    }
+
+    private void createIndex(String dataTableName, String indexTableName, int 
indexVersions)
+        throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" on " +
+                dataTableName + " (val1) include (val2, val3)" +
+                " VERSIONS=" + indexVersions);
+            conn.commit();
+        }
+    }
+
+    public static void assertExplainPlan(Connection conn, String selectSql,
+                                         String dataTableFullName, String 
indexTableFullName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + 
selectSql);
+        String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+        IndexToolIT.assertExplainPlan(false, actualExplainPlan, 
dataTableFullName, indexTableFullName);
+    }
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
index 51f4fe0..6d4d4d6 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
@@ -40,6 +40,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
@@ -62,8 +63,7 @@ public class PointInTimeQueryIT extends BaseQueryIT {
     
     public PointInTimeQueryIT(String idxDdl, boolean columnEncoded)
             throws Exception {
-        // These queries fail without KEEP_DELETED_CELLS=true
-        super(idxDdl, columnEncoded, true);
+        super(idxDdl, columnEncoded, 
!HbaseCompatCapabilities.isLookbackBeyondDeletesSupported());
     }
 
     @Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
new file mode 100644
index 0000000..4dcaea7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import 
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SCNIT extends ParallelStatsDisabledIT {
+
+    @Test
+    public void testReadBeforeDelete() throws Exception {
+        //we don't support reading earlier than a delete in HBase 2.0-2.2, 
only in 1.4+ and 2.3+
+        if (!HbaseCompatCapabilities.isLookbackBeyondDeletesSupported()){
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        long timeBeforeDelete;
+        long timeAfterDelete;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','aa')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('c','cc')");
+            conn.commit();
+            timeBeforeDelete = EnvironmentEdgeManager.currentTime() + 1;
+            Thread.sleep(2);
+            conn.createStatement().execute("DELETE FROM " + fullTableName + " 
WHERE k = 'b'");
+            conn.commit();
+            timeAfterDelete = EnvironmentEdgeManager.currentTime() + 1;
+        }
+
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(timeBeforeDelete));
+        try (Connection connscn = DriverManager.getConnection(getUrl(), 
props)) {
+            ResultSet rs = connscn.createStatement().executeQuery("select * 
from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(1));
+            assertFalse(rs.next());
+            rs.close();
+        }
+        props.clear();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(timeAfterDelete));
+        try (Connection connscn = DriverManager.getConnection(getUrl(), 
props)) {
+            ResultSet rs = connscn.createStatement().executeQuery("select * 
from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(1));
+            assertFalse(rs.next());
+            rs.close();
+        }
+
+    }
+
+    @Test
+    public void testSCNWithTTL() throws Exception {
+        int ttl = 2;
+        String fullTableName = createTableWithTTL(ttl);
+        //sleep for one second longer than ttl
+        Thread.sleep(ttl * 1000 + 1000);
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
+        try (Connection connscn = DriverManager.getConnection(getUrl(), 
props)) {
+            ResultSet rs = connscn.createStatement().executeQuery("select * 
from " + fullTableName);
+            assertFalse(rs.next());
+            rs.close();
+        }
+    }
+
+    private String createTableWithTTL(int ttl) throws SQLException, 
InterruptedException {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        StringBuilder optionsBuilder = new StringBuilder();
+        if (ttl > 0){
+            optionsBuilder.append("TTL=");
+            optionsBuilder.append(ttl);
+        }
+        String ddlOptions = optionsBuilder.toString();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement()
+                    .execute(String.format("CREATE TABLE %s" +
+                            "(k VARCHAR PRIMARY KEY, f.v VARCHAR) %s", 
fullTableName, ddlOptions));
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','aa')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('c','cc')");
+            conn.commit();
+        }
+        return fullTableName;
+    }
+
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3b57f7a..ebed26c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -31,11 +31,15 @@ import java.util.Set;
 import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import 
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
 import org.apache.phoenix.compile.JoinCompiler.JoinTable;
 import org.apache.phoenix.compile.JoinCompiler.Table;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
@@ -79,6 +83,7 @@ import org.apache.phoenix.schema.PTable;
 import 
org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -163,6 +168,7 @@ public class QueryCompiler {
      * @throws AmbiguousColumnException if an unaliased column name is 
ambiguous across multiple tables
      */
     public QueryPlan compile() throws SQLException{
+        verifySCN();
         QueryPlan plan;
         if (select.isUnion()) {
             plan = compileUnionAll(select);
@@ -172,6 +178,28 @@ public class QueryCompiler {
         return plan;
     }
 
+    private void verifySCN() throws SQLException {
+        if (!HbaseCompatCapabilities.isMaxLookbackTimeSupported()) {
+            return;
+        }
+        PhoenixConnection conn = statement.getConnection();
+        Long scn = conn.getSCN();
+        if (scn == null) {
+            return;
+        }
+        ColumnResolver resolver =
+            FromCompiler.getResolverForQuery(select, conn);
+        long maxLookBackAgeInMillis =
+            
CompatBaseScannerRegionObserver.getMaxLookbackInMillis(conn.getQueryServices().
+            getConfiguration());
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        if (maxLookBackAgeInMillis > 0 && now - maxLookBackAgeInMillis > scn){
+            throw new SQLExceptionInfo.Builder(
+                
SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE)
+                .build().buildException();
+        }
+    }
+
     public QueryPlan compileUnionAll(SelectStatement select) throws 
SQLException { 
         List<SelectStatement> unionAllSelects = select.getSelects();
         List<QueryPlan> plans = new ArrayList<QueryPlan>();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9865e91..6fe5c95 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -29,13 +29,20 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
+import org.apache.hadoop.hbase.regionserver.Store;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
+import 
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -47,7 +54,7 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 
-abstract public class BaseScannerRegionObserver implements RegionObserver {
+abstract public class BaseScannerRegionObserver extends 
CompatBaseScannerRegionObserver {
 
     public static final String AGGREGATORS = "_Aggs";
     public static final String UNORDERED_GROUP_BY_EXPRESSIONS = 
"_UnorderedGroupByExpressions";
@@ -367,4 +374,18 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
                 dataRegion, indexMaintainer, null, viewConstants, null, null, 
projector, ptr, useQualiferAsListIndex);
     }
 
+
+   /* We want to override the store scanner so that we can read "past" a delete
+    marker on an SCN / lookback query to see the underlying edit. This was 
possible
+    in HBase 1.x, but not possible after the interface changes in HBase 2.0. 
HBASE-24321 in
+     HBase 2.3 gave us this ability back, but we need to use it through a 
compatibility shim
+     so we can compile against 2.1 and 2.2. When 2.3 is the minimum supported 
HBase
+     version, the shim can be retired and the logic moved into the real coproc.
+
+    We also need to override the flush compaction coproc hooks in order to 
implement max lookback
+     age to keep versions from being purged.
+
+    Because the required APIs aren't present in HBase 2.1 and 2.2, we override 
in the 2.3
+     version of CompatBaseScannerRegionObserver and no-op in the other 
versions. */
+
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 8c83c6f..8230cba 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -190,6 +190,8 @@ public enum SQLExceptionCode {
      UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values 
of CURRENT_SCN and BUILD_INDEX_AT must be equal."),
      ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated 
when the BUILD_INDEX_AT property is specified"),
      PARENT_TABLE_NOT_FOUND(536, "42913", "Can't drop the index because the 
parent table in the DROP statement is incorrect."),
+    CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915",
+        "Cannot use SCN to look further back in the past beyond the configured 
max lookback age"),
 
      /**
      * HBase and Phoenix specific implementation defined sub-classes.
diff --git 
a/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
 
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
new file mode 100644
index 0000000..629946b
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class HbaseCompatCapabilities {
+
+    public static boolean isMaxLookbackTimeSupported() {
+        return false;
+    }
+
+    //In HBase 2.1 and 2.2, a lookback query won't return any results if 
covered by a future delete
+    public static boolean isLookbackBeyondDeletesSupported() { return false; }
+
+}
diff --git 
a/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
 
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
new file mode 100644
index 0000000..20aaf20
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+
+import java.io.IOException;
+
+public class CompatBaseScannerRegionObserver implements RegionObserver {
+
+    public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+        "phoenix.max.lookback.age.seconds";
+    public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
+    public void 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+                                             ScanType scanType, ScanOptions 
options, CompactionLifeCycleTracker tracker,
+                                             CompactionRequest request) throws 
IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
implement the "max
+        //lookback age" feature
+    }
+
+    public void 
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+                                           ScanOptions options, 
FlushLifeCycleTracker tracker) throws IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
implement the "max
+        //lookback age" feature
+    }
+
+    public void preMemStoreCompactionCompactScannerOpen(
+        ObserverContext<RegionCoprocessorEnvironment> c, Store store, 
ScanOptions options)
+        throws IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
implement the "max
+        //lookback age" feature
+    }
+
+    public void 
preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store 
store,
+                                           ScanOptions options) throws 
IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
override the scan
+        //to "look behind" delete markers on SCN queries
+    }
+
+    public long getMaxLookbackInMillis(Configuration conf){
+        //config param is in seconds, switch to millis
+        return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+    }
+
+    //max lookback age isn't supported in HBase 2.1 or HBase 2.2
+    public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+        return false;
+    }
+
+    public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+        return false;
+    }
+
+}
diff --git 
a/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
 
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
new file mode 100644
index 0000000..629946b
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class HbaseCompatCapabilities {
+
+    public static boolean isMaxLookbackTimeSupported() {
+        return false;
+    }
+
+    //In HBase 2.1 and 2.2, a lookback query won't return any results if 
covered by a future delete
+    public static boolean isLookbackBeyondDeletesSupported() { return false; }
+
+}
diff --git 
a/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
 
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
new file mode 100644
index 0000000..00c2501
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+
+import java.io.IOException;
+
+public class CompatBaseScannerRegionObserver implements RegionObserver {
+
+    public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+            "phoenix.max.lookback.age.seconds";
+    public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
+    public void 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+                                      ScanType scanType, ScanOptions options, 
CompactionLifeCycleTracker tracker,
+                                      CompactionRequest request) throws 
IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
implement the "max
+        //lookback age" feature
+    }
+
+    public void 
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+                                    ScanOptions options, FlushLifeCycleTracker 
tracker) throws IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
implement the "max
+        //lookback age" feature
+    }
+
+    public void preMemStoreCompactionCompactScannerOpen(
+            ObserverContext<RegionCoprocessorEnvironment> c, Store store, 
ScanOptions options)
+            throws IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
implement the "max
+        //lookback age" feature
+    }
+
+    public void 
preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store 
store,
+                                    ScanOptions options) throws IOException {
+        //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't 
override the scan
+        //to "look behind" delete markers on SCN queries
+    }
+
+    public long getMaxLookbackInMillis(Configuration conf){
+        //config param is in seconds, switch to millis
+        return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+                DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+    }
+
+    //max lookback age isn't supported in HBase 2.1 or HBase 2.2
+    public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+        return false;
+    }
+
+    public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+        return false;
+    }
+
+}
diff --git 
a/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
 
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
new file mode 100644
index 0000000..9b83e6d
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase;
+
+public class HbaseCompatCapabilities {
+
+    public static boolean isMaxLookbackTimeSupported() {
+        return true;
+    }
+
+    //In HBase 2.1 and 2.2, a lookback query won't return any results if 
covered by a future delete,
+    //but in 2.3 and later we have the preSoreScannerOpen hook that overrides 
that behavior
+    public static boolean isLookbackBeyondDeletesSupported() { return true; }
+
+}
diff --git 
a/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
 
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
new file mode 100644
index 0000000..cd1a7f5
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+
+public class CompatBaseScannerRegionObserver implements RegionObserver {
+
+    public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+        "phoenix.max.lookback.age.seconds";
+    public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
+    public void 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+                                             ScanType scanType, ScanOptions 
options, CompactionLifeCycleTracker tracker,
+                                             CompactionRequest request) throws 
IOException {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        if (isMaxLookbackTimeEnabled(conf)) {
+            setScanOptionsForFlushesAndCompactions(conf, options, store, 
scanType);
+        }
+    }
+
+    public void 
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
+                                                ScanOptions options, 
FlushLifeCycleTracker tracker) throws IOException {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        if (isMaxLookbackTimeEnabled(conf)) {
+            setScanOptionsForFlushesAndCompactions(conf, options, store, 
ScanType.COMPACT_RETAIN_DELETES);
+        }
+    }
+
+    public void preMemStoreCompactionCompactScannerOpen(
+        ObserverContext<RegionCoprocessorEnvironment> c, Store store, 
ScanOptions options)
+        throws IOException {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        if (isMaxLookbackTimeEnabled(conf)) {
+            MemoryCompactionPolicy inMemPolicy =
+                store.getColumnFamilyDescriptor().getInMemoryCompaction();
+            ScanType scanType;
+            //the eager and adaptive in-memory compaction policies can purge 
versions; the others
+            // can't. (Eager always does; adaptive sometimes does)
+            if (inMemPolicy.equals(MemoryCompactionPolicy.EAGER) ||
+                inMemPolicy.equals(MemoryCompactionPolicy.ADAPTIVE)) {
+                scanType = ScanType.COMPACT_DROP_DELETES;
+            } else {
+                scanType = ScanType.COMPACT_RETAIN_DELETES;
+            }
+            setScanOptionsForFlushesAndCompactions(conf, options, store, 
scanType);
+        }
+    }
+
+    public void 
preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store 
store,
+                                           ScanOptions options) throws 
IOException {
+
+        if (!storeFileScanDoesntNeedAlteration(options)) {
+            //PHOENIX-4277 -- When doing a point-in-time (SCN) Scan, HBase by 
default will hide
+            // mutations that happen before a delete marker. This overrides 
that behavior.
+            options.setMinVersions(options.getMinVersions());
+            KeepDeletedCells keepDeletedCells = KeepDeletedCells.TRUE;
+            if (store.getColumnFamilyDescriptor().getTimeToLive() != 
HConstants.FOREVER) {
+                keepDeletedCells = KeepDeletedCells.TTL;
+            }
+            options.setKeepDeletedCells(keepDeletedCells);
+        }
+    }
+
+    private boolean storeFileScanDoesntNeedAlteration(ScanOptions options) {
+        Scan scan = options.getScan();
+        boolean isRaw = scan.isRaw();
+        //true if keep deleted cells is either TRUE or TTL
+        boolean keepDeletedCells = 
options.getKeepDeletedCells().equals(KeepDeletedCells.TRUE) ||
+            options.getKeepDeletedCells().equals(KeepDeletedCells.TTL);
+        boolean timeRangeIsLatest = scan.getTimeRange().getMax() == 
HConstants.LATEST_TIMESTAMP;
+        boolean timestampIsTransactional =
+            isTransactionalTimestamp(scan.getTimeRange().getMax());
+        return isRaw
+            || keepDeletedCells
+            || timeRangeIsLatest
+            || timestampIsTransactional;
+    }
+
+    private boolean isTransactionalTimestamp(long ts) {
+        //have to use the HBase edge manager because the Phoenix one is in 
phoenix-core
+        return ts > (long) (EnvironmentEdgeManager.currentTime() * 1.1);
+    }
+
+    /*
+     * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL ,
+     * let delete markers age once lookback age is done.
+     */
+    public KeepDeletedCells getKeepDeletedCells(ScanOptions options, ScanType 
scanType) {
+        //if we're doing a minor compaction or flush, always set keep deleted 
cells
+        //to true. Otherwise, if keep deleted cells is false or TTL, use 
KeepDeletedCells TTL,
+        //where the value of the ttl might be overriden to the max lookback 
age elsewhere
+        return (options.getKeepDeletedCells() == KeepDeletedCells.TRUE
+                || scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ?
+                KeepDeletedCells.TRUE : KeepDeletedCells.TTL;
+    }
+
+    /*
+     * if the user set a TTL we should leave MIN_VERSIONS at the default (0 in 
most of the cases).
+     * Otherwise the data (1st version) will not be removed after the TTL. If 
no TTL, we want
+     * Math.max(maxVersions, minVersions, 1)
+     */
+    public int getMinVersions(ScanOptions options, ColumnFamilyDescriptor 
cfDescriptor) {
+        return cfDescriptor.getTimeToLive() != HConstants.FOREVER ? 
options.getMinVersions()
+            : Math.max(Math.max(options.getMinVersions(),
+            cfDescriptor.getMaxVersions()),1);
+    }
+
+    /**
+     *
+     * @param conf HBase Configuration
+     * @param columnDescriptor ColumnFamilyDescriptor for the store being 
compacted
+     * @param options ScanOptions of overrides to the compaction scan
+     * @return Time to live in milliseconds, based on both HBase TTL and 
Phoenix max lookback age
+     */
+    public long getTimeToLiveForCompactions(Configuration conf,
+                                                   ColumnFamilyDescriptor 
columnDescriptor,
+                                                   ScanOptions options) {
+        long ttlConfigured = columnDescriptor.getTimeToLive();
+        long ttlInMillis = ttlConfigured * 1000;
+        long maxLookbackTtl = getMaxLookbackInMillis(conf);
+        if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
+            if (ttlConfigured == HConstants.FOREVER
+                && columnDescriptor.getKeepDeletedCells() != 
KeepDeletedCells.TRUE) {
+                // If user configured default TTL(FOREVER) and keep deleted 
cells to false or
+                // TTL then to remove unwanted delete markers we should change 
ttl to max lookback age
+                ttlInMillis = maxLookbackTtl;
+            } else {
+                //if there is a TTL, use TTL instead of max lookback age.
+                // Max lookback age should be more recent or equal to TTL
+                ttlInMillis = Math.max(ttlInMillis, maxLookbackTtl);
+            }
+        }
+
+        return ttlInMillis;
+    }
+
+    public void setScanOptionsForFlushesAndCompactions(Configuration conf,
+                                                               ScanOptions 
options,
+                                                               final Store 
store,
+                                                               ScanType type) {
+        ColumnFamilyDescriptor cfDescriptor = 
store.getColumnFamilyDescriptor();
+        options.setTTL(getTimeToLiveForCompactions(conf, cfDescriptor,
+            options));
+        options.setKeepDeletedCells(getKeepDeletedCells(options, type));
+        options.setMaxVersions(Integer.MAX_VALUE);
+        options.setMinVersions(getMinVersions(options, cfDescriptor));
+    }
+
+    public static long getMaxLookbackInMillis(Configuration conf){
+        //config param is in seconds, switch to millis
+        return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+    }
+
+    public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+        return 
isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
+    }
+
+    public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+        return maxLookbackTime > 0L;
+    }
+
+}

Reply via email to