This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new cabab87 PHOENIX-5881 - Port PHOENIX-5645 (MaxLookbackAge) to 5.x
cabab87 is described below
commit cabab87e6c18db5eb2b69c24885c28806dc404e7
Author: Geoffrey Jacoby <[email protected]>
AuthorDate: Thu Aug 6 14:56:49 2020 -0700
PHOENIX-5881 - Port PHOENIX-5645 (MaxLookbackAge) to 5.x
---
.../end2end/IndexToolForNonTxGlobalIndexIT.java | 10 +-
.../org/apache/phoenix/end2end/MaxLookbackIT.java | 423 +++++++++++++++++++++
.../apache/phoenix/end2end/PointInTimeQueryIT.java | 4 +-
.../it/java/org/apache/phoenix/end2end/SCNIT.java | 129 +++++++
.../org/apache/phoenix/compile/QueryCompiler.java | 28 ++
.../coprocessor/BaseScannerRegionObserver.java | 23 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 2 +
.../compat/hbase/HbaseCompatCapabilities.java | 32 ++
.../CompatBaseScannerRegionObserver.java | 80 ++++
.../compat/hbase/HbaseCompatCapabilities.java | 32 ++
.../CompatBaseScannerRegionObserver.java | 80 ++++
.../compat/hbase/HbaseCompatCapabilities.java | 31 ++
.../CompatBaseScannerRegionObserver.java | 197 ++++++++++
13 files changed, 1065 insertions(+), 6 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index e32d310..53ee745 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -42,6 +42,7 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -1023,11 +1024,14 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseUniqueNamesOwnClusterIT
customEdge.incrementValue(waitForUpsert);
return;
}
+ //In HBase 2.0-2.2, we can't see Puts behind Deletes even on
lookback / SCN queries. Starting in 2.3 we can
+ //That changes the counts we expect from index tool verification
+ int putBehindDeleteMarkerCount =
HbaseCompatCapabilities.isLookbackBeyondDeletesSupported() ? 1 :0;
// regular job without delete row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName,
dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st",
String.valueOf(t0),"-et", String.valueOf(t4));
- verifyCounters(it, 2, 2);
+ verifyCounters(it, 2, 2 + putBehindDeleteMarkerCount);
customEdge.incrementValue(waitForUpsert);
// job with 2 rows
@@ -1039,13 +1043,13 @@ public class IndexToolForNonTxGlobalIndexIT extends
BaseUniqueNamesOwnClusterIT
// job with update on only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName,
dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st",
String.valueOf(t1),"-et", String.valueOf(t3));
- verifyCounters(it, 1, 1);
+ verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount);
customEdge.incrementValue(waitForUpsert);
// job with update on only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName,
dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st",
String.valueOf(t2),"-et", String.valueOf(t4));
- verifyCounters(it, 1, 1);
+ verifyCounters(it, 1, 1 + putBehindDeleteMarkerCount);
customEdge.incrementValue(waitForUpsert);
// job with update on only one row
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
new file mode 100644
index 0000000..957a7a4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.AfterClass;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.assertRawCellCount;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
+import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+
+@NeedsOwnMiniClusterTest
+public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
+ private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class);
+ private static final int MAX_LOOKBACK_AGE = 15;
+ private static final int ROWS_POPULATED = 2;
+ public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
+ private String tableDDLOptions;
+ private StringBuilder optionBuilder;
+ ManualEnvironmentEdge injectEdge;
+ private int ttl;
+ //max lookback isn't supported in HBase 2.1 and 2.2 because of missing
coprocessor
+ // interfaces; see HBASE-24321
+ private static boolean isMaxLookbackSupported =
+ HbaseCompatCapabilities.isMaxLookbackTimeSupported();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ if (!isMaxLookbackSupported) {
+ return;
+ }
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+
props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(MAX_LOOKBACK_AGE));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void beforeTest(){
+ if (!isMaxLookbackSupported) {
+ return;
+ }
+ EnvironmentEdgeManager.reset();
+ optionBuilder = new StringBuilder();
+ this.tableDDLOptions = optionBuilder.toString();
+ ttl = 0;
+ injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ }
+
+ @After
+ public synchronized void afterClass() {
+ if (!isMaxLookbackSupported) {
+ return;
+ }
+ EnvironmentEdgeManager.reset();
+ }
+
+ @Test
+ public void testTooLowSCNWithMaxLookbackAge() throws Exception {
+ if (!isMaxLookbackSupported) {
+ return;
+ }
+ String dataTableName = generateUniqueName();
+ createTable(dataTableName);
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ //increase long enough to make sure we can find the syscat row for the
table
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ populateTable(dataTableName);
+ long populateTime = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(populateTime));
+ try (Connection connscn = DriverManager.getConnection(getUrl(),
props)) {
+ connscn.createStatement().executeQuery("select * from " +
dataTableName);
+ } catch (SQLException se) {
+ SQLExceptionCode code =
+
SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
+ TestUtil.assertSqlExceptionCode(code, se);
+ return;
+ }
+ Assert.fail("We should have thrown an exception for the too-early
SCN");
+ }
+
+ @Test(timeout=120000L)
+ public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
+ if (!isMaxLookbackSupported) {
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, 1);
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ TableName indexTable = TableName.valueOf(indexName);
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(10); //make sure we delete at a
different ts
+ Statement stmt = conn.createStatement();
+ stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id =
'a'");
+ Assert.assertEquals(1, stmt.getUpdateCount());
+ conn.commit();
+ //select stmt to get row we deleted
+ String sql = String.format("SELECT * FROM %s WHERE id = 'a'",
dataTableName);
+ String indexSql = String.format("SELECT * FROM %s WHERE val1 =
'ab'", dataTableName);
+ int rowsPlusDeleteMarker = ROWS_POPULATED;
+ assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+ flush(dataTable);
+ flush(indexTable);
+ assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+ long beforeFirstCompactSCN =
EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(1); //new ts for major compaction
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+ assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+ //wait for the lookback time. After this compactions should purge
the deleted row
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ long beforeSecondCompactSCN =
EnvironmentEdgeManager.currentTimeMillis();
+ String notDeletedRowSql =
+ String.format("SELECT * FROM %s WHERE id = 'b'",
dataTableName);
+ String notDeletedIndexRowSql =
+ String.format("SELECT * FROM %s WHERE val1 = 'bc'",
dataTableName);
+ assertRowExistsAtSCN(getUrl(), notDeletedRowSql,
beforeSecondCompactSCN, true);
+ assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql,
beforeSecondCompactSCN, true);
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+ conn.createStatement().execute("upsert into " + dataTableName +
+ " values ('c', 'cd', 'cde', 'cdef')");
+ conn.commit();
+ injectEdge.incrementValue(1L);
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //should still be ROWS_POPULATED because we added one and deleted
one
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+
+ //deleted row should be gone, but not deleted row should still be
there.
+ assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN,
false);
+ assertRowExistsAtSCN(getUrl(), notDeletedRowSql,
beforeSecondCompactSCN, true);
+ assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql,
beforeSecondCompactSCN, true);
+
+ }
+ }
+
+ @Test(timeout=60000L)
+ public void testTTLAndMaxLookbackAge() throws Exception {
+ if (!isMaxLookbackSupported) {
+ return;
+ }
+ ttl = 20;
+ optionBuilder.append("TTL=" + ttl);
+ tableDDLOptions = optionBuilder.toString();
+ Configuration conf = getUtility().getConfiguration();
+ //disable automatic memstore flushes
+ long oldMemstoreFlushInterval =
conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+ HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, 1);
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.incrementValue(1);
+ long afterFirstInsertSCN =
EnvironmentEdgeManager.currentTimeMillis();
+ TableName dataTable = TableName.valueOf(dataTableName);
+ TableName indexTable = TableName.valueOf(indexName);
+ assertTableHasTtl(conn, dataTable, ttl);
+ assertTableHasTtl(conn, indexTable, ttl);
+ //first make sure we inserted correctly
+ String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'",
dataTableName);
+ String indexSql = String.format("SELECT val2 FROM %s WHERE val1 =
'ab'", dataTableName);
+ assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true);
+ int originalRowCount = 2;
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ //force a flush
+ flush(dataTable);
+ flush(indexTable);
+ //flush shouldn't have changed it
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) -
+ (EnvironmentEdgeManager.currentTimeMillis() -
afterFirstInsertSCN);
+ if (timeToAdvance > 0) {
+ injectEdge.incrementValue(timeToAdvance);
+ }
+ //make sure it's still on disk
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ injectEdge.incrementValue(1); //get a new timestamp for compaction
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //nothing should have been purged by this major compaction
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ //now wait the TTL
+ timeToAdvance = (ttl * 1000) -
+ (EnvironmentEdgeManager.currentTimeMillis() -
afterFirstInsertSCN);
+ if (timeToAdvance > 0) {
+ injectEdge.incrementValue(timeToAdvance);
+ }
+ //make sure that we can compact away the now-expired rows
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //note that before HBase 1.4, we don't have HBASE-17956
+ // and this will always return 0 whether it's still on-disk or not
+ assertRawRowCount(conn, dataTable, 0);
+ assertRawRowCount(conn, indexTable, 0);
+ } finally{
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
oldMemstoreFlushInterval);
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testRecentMaxVersionsNotCompactedAway() throws Exception {
+ if (!isMaxLookbackSupported) {
+ return;
+ }
+ int versions = 2;
+ optionBuilder.append("VERSIONS=" + versions);
+ tableDDLOptions = optionBuilder.toString();
+ String firstValue = "abc";
+ String secondValue = "def";
+ String thirdValue = "ghi";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, versions);
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ //increment to make sure we don't "look back" past table creation
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ injectEdge.incrementValue(1); //increment by 1 so we can see our
write
+ long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+ //make sure table and index metadata is set up right for versions
+ TableName dataTable = TableName.valueOf(dataTableName);
+ TableName indexTable = TableName.valueOf(indexName);
+ assertTableHasVersions(conn, dataTable, versions);
+ assertTableHasVersions(conn, indexTable, versions);
+ //check query optimizer is doing what we expect
+ String dataTableSelectSql =
+ String.format("SELECT val2 FROM %s WHERE id = 'a'",
dataTableName);
+ String indexTableSelectSql =
+ String.format("SELECT val2 FROM %s WHERE val1 = 'ab'",
dataTableName);
+ assertExplainPlan(conn, indexTableSelectSql, dataTableName,
indexName);
+ //make sure the data was inserted correctly in the first place
+ assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql,
afterInsertSCN, firstValue);
+ assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql,
afterInsertSCN, firstValue);
+ //force first update to get a distinct ts
+ injectEdge.incrementValue(1);
+ updateColumn(conn, dataTableName, "id", "a", "val2", secondValue);
+ injectEdge.incrementValue(1); //now make update visible
+ long afterFirstUpdateSCN =
EnvironmentEdgeManager.currentTimeMillis();
+ //force second update to get a distinct ts
+ injectEdge.incrementValue(1);
+ updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue);
+ injectEdge.incrementValue(1);
+ long afterSecondUpdateSCN =
EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(1);
+ //check to make sure we can see all three versions at the
appropriate times
+ String[] allValues = {firstValue, secondValue, thirdValue};
+ long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN,
afterSecondUpdateSCN};
+ assertMultiVersionLookbacks(dataTableSelectSql, allValues,
allSCNs);
+ assertMultiVersionLookbacks(indexTableSelectSql, allValues,
allSCNs);
+ flush(dataTable);
+ flush(indexTable);
+ //after flush, check to make sure we can see all three versions at
the appropriate times
+ assertMultiVersionLookbacks(dataTableSelectSql, allValues,
allSCNs);
+ assertMultiVersionLookbacks(indexTableSelectSql, allValues,
allSCNs);
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //after major compaction, check to make sure we can see all three
versions
+ // at the appropriate times
+ assertMultiVersionLookbacks(dataTableSelectSql, allValues,
allSCNs);
+ assertMultiVersionLookbacks(indexTableSelectSql, allValues,
allSCNs);
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ long afterLookbackAgeSCN =
EnvironmentEdgeManager.currentTimeMillis();
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //empty column, 1 version of val 1, 3 versions of val2, 1 version
of val3 = 6
+ assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
+ //2 versions of empty column, 2 versions of val2,
+ // 2 versions of val3 (since we write whole rows to index) = 6
+ assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"),
6);
+ //empty column + 1 version each of val1,2 and 3 = 4
+ assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
+ //1 version of empty column, 1 version of val2, 1 version of val3
= 3
+ assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"),
3);
+ }
+ }
+
+ private void flush(TableName table) throws IOException {
+ Admin admin = getUtility().getHBaseAdmin();
+ admin.flush(table);
+ }
+
+ private void majorCompact(TableName table) throws Exception {
+ TestUtil.majorCompact(getUtility(), table);
+ }
+
+ private void assertMultiVersionLookbacks(String dataTableSelectSql,
+ String[] values, long[] scns)
+ throws Exception {
+ //make sure we can still look back after updating
+ for (int k = 0; k < values.length; k++){
+ assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql,
scns[k], values[k]);
+ }
+ }
+
+ private void updateColumn(Connection conn, String dataTableName,
+ String idColumn, String id, String valueColumn,
String value)
+ throws SQLException {
+ String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES
('%s', '%s')",
+ dataTableName, idColumn, valueColumn, id, value);
+ conn.createStatement().execute(upsertSql);
+ conn.commit();
+ }
+
+ private void createTable(String tableName) throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String createSql = "create table " + tableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), " +
+ "val2 varchar(10), val3 varchar(10))" + tableDDLOptions;
+ conn.createStatement().execute(createSql);
+ conn.commit();
+ }
+ }
+ private void populateTable(String tableName) throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("upsert into " + tableName + "
values ('a', 'ab', 'abc', 'abcd')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + tableName + "
values ('b', 'bc', 'bcd', 'bcde')");
+ conn.commit();
+ }
+ }
+
+ private void createIndex(String dataTableName, String indexTableName, int
indexVersions)
+ throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE INDEX " + indexTableName +
" on " +
+ dataTableName + " (val1) include (val2, val3)" +
+ " VERSIONS=" + indexVersions);
+ conn.commit();
+ }
+ }
+
+ public static void assertExplainPlan(Connection conn, String selectSql,
+ String dataTableFullName, String
indexTableFullName) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " +
selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ IndexToolIT.assertExplainPlan(false, actualExplainPlan,
dataTableFullName, indexTableFullName);
+ }
+
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
index 51f4fe0..6d4d4d6 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PointInTimeQueryIT.java
@@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
@@ -62,8 +63,7 @@ public class PointInTimeQueryIT extends BaseQueryIT {
public PointInTimeQueryIT(String idxDdl, boolean columnEncoded)
throws Exception {
- // These queries fail without KEEP_DELETED_CELLS=true
- super(idxDdl, columnEncoded, true);
+ super(idxDdl, columnEncoded,
!HbaseCompatCapabilities.isLookbackBeyondDeletesSupported());
}
@Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
new file mode 100644
index 0000000..4dcaea7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SCNIT extends ParallelStatsDisabledIT {
+
+ @Test
+ public void testReadBeforeDelete() throws Exception {
+ //we don't support reading earlier than a delete in HBase 2.0-2.2,
only in 1.4+ and 2.3+
+ if (!HbaseCompatCapabilities.isLookbackBeyondDeletesSupported()){
+ return;
+ }
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ long timeBeforeDelete;
+ long timeAfterDelete;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName +
"(k VARCHAR PRIMARY KEY, v VARCHAR)");
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + "
VALUES('a','aa')");
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + "
VALUES('b','bb')");
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + "
VALUES('c','cc')");
+ conn.commit();
+ timeBeforeDelete = EnvironmentEdgeManager.currentTime() + 1;
+ Thread.sleep(2);
+ conn.createStatement().execute("DELETE FROM " + fullTableName + "
WHERE k = 'b'");
+ conn.commit();
+ timeAfterDelete = EnvironmentEdgeManager.currentTime() + 1;
+ }
+
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(timeBeforeDelete));
+ try (Connection connscn = DriverManager.getConnection(getUrl(),
props)) {
+ ResultSet rs = connscn.createStatement().executeQuery("select *
from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertFalse(rs.next());
+ rs.close();
+ }
+ props.clear();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(timeAfterDelete));
+ try (Connection connscn = DriverManager.getConnection(getUrl(),
props)) {
+ ResultSet rs = connscn.createStatement().executeQuery("select *
from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertFalse(rs.next());
+ rs.close();
+ }
+
+ }
+
+ @Test
+ public void testSCNWithTTL() throws Exception {
+ int ttl = 2;
+ String fullTableName = createTableWithTTL(ttl);
+ //sleep for one second longer than ttl
+ Thread.sleep(ttl * 1000 + 1000);
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
+ try (Connection connscn = DriverManager.getConnection(getUrl(),
props)) {
+ ResultSet rs = connscn.createStatement().executeQuery("select *
from " + fullTableName);
+ assertFalse(rs.next());
+ rs.close();
+ }
+ }
+
+ private String createTableWithTTL(int ttl) throws SQLException,
InterruptedException {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ StringBuilder optionsBuilder = new StringBuilder();
+ if (ttl > 0){
+ optionsBuilder.append("TTL=");
+ optionsBuilder.append(ttl);
+ }
+ String ddlOptions = optionsBuilder.toString();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement()
+ .execute(String.format("CREATE TABLE %s" +
+ "(k VARCHAR PRIMARY KEY, f.v VARCHAR) %s",
fullTableName, ddlOptions));
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + "
VALUES('a','aa')");
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + "
VALUES('b','bb')");
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + "
VALUES('c','cc')");
+ conn.commit();
+ }
+ return fullTableName;
+ }
+
+}
\ No newline at end of file
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3b57f7a..ebed26c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -31,11 +31,15 @@ import java.util.Set;
import com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
import org.apache.phoenix.compile.JoinCompiler.JoinTable;
import org.apache.phoenix.compile.JoinCompiler.Table;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
@@ -79,6 +83,7 @@ import org.apache.phoenix.schema.PTable;
import
org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -163,6 +168,7 @@ public class QueryCompiler {
* @throws AmbiguousColumnException if an unaliased column name is
ambiguous across multiple tables
*/
public QueryPlan compile() throws SQLException{
+ verifySCN();
QueryPlan plan;
if (select.isUnion()) {
plan = compileUnionAll(select);
@@ -172,6 +178,28 @@ public class QueryCompiler {
return plan;
}
+ private void verifySCN() throws SQLException {
+ if (!HbaseCompatCapabilities.isMaxLookbackTimeSupported()) {
+ return;
+ }
+ PhoenixConnection conn = statement.getConnection();
+ Long scn = conn.getSCN();
+ if (scn == null) {
+ return;
+ }
+ ColumnResolver resolver =
+ FromCompiler.getResolverForQuery(select, conn);
+ long maxLookBackAgeInMillis =
+
CompatBaseScannerRegionObserver.getMaxLookbackInMillis(conn.getQueryServices().
+ getConfiguration());
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ if (maxLookBackAgeInMillis > 0 && now - maxLookBackAgeInMillis > scn){
+ throw new SQLExceptionInfo.Builder(
+
SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE)
+ .build().buildException();
+ }
+ }
+
public QueryPlan compileUnionAll(SelectStatement select) throws
SQLException {
List<SelectStatement> unionAllSelects = select.getSelects();
List<QueryPlan> plans = new ArrayList<QueryPlan>();
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9865e91..6fe5c95 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -29,13 +29,20 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
+import org.apache.hadoop.hbase.regionserver.Store;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
+import
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
@@ -47,7 +54,7 @@ import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
-abstract public class BaseScannerRegionObserver implements RegionObserver {
+abstract public class BaseScannerRegionObserver extends
CompatBaseScannerRegionObserver {
public static final String AGGREGATORS = "_Aggs";
public static final String UNORDERED_GROUP_BY_EXPRESSIONS =
"_UnorderedGroupByExpressions";
@@ -367,4 +374,18 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
dataRegion, indexMaintainer, null, viewConstants, null, null,
projector, ptr, useQualiferAsListIndex);
}
+
+ /* We want to override the store scanner so that we can read "past" a delete
+ marker on an SCN / lookback query to see the underlying edit. This was
possible
+ in HBase 1.x, but not possible after the interface changes in HBase 2.0.
HBASE-24321 in
+ HBase 2.3 gave us this ability back, but we need to use it through a
compatibility shim
+ so we can compile against 2.1 and 2.2. When 2.3 is the minimum supported
HBase
+ version, the shim can be retired and the logic moved into the real coproc.
+
+ We also need to override the flush compaction coproc hooks in order to
implement max lookback
+ age to keep versions from being purged.
+
+ Because the required APIs aren't present in HBase 2.1 and 2.2, we override
in the 2.3
+ version of CompatBaseScannerRegionObserver and no-op in the other
versions. */
+
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 8c83c6f..8230cba 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -190,6 +190,8 @@ public enum SQLExceptionCode {
UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values
of CURRENT_SCN and BUILD_INDEX_AT must be equal."),
ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated
when the BUILD_INDEX_AT property is specified"),
PARENT_TABLE_NOT_FOUND(536, "42913", "Can't drop the index because the
parent table in the DROP statement is incorrect."),
+ CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915",
+ "Cannot use SCN to look further back in the past beyond the configured
max lookback age"),
/**
* HBase and Phoenix specific implementation defined sub-classes.
diff --git
a/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
new file mode 100644
index 0000000..629946b
--- /dev/null
+++
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class HbaseCompatCapabilities {
+
+ public static boolean isMaxLookbackTimeSupported() {
+ return false;
+ }
+
+ //In HBase 2.1 and 2.2, a lookback query won't return any results if
covered by a future delete
+ public static boolean isLookbackBeyondDeletesSupported() { return false; }
+
+}
diff --git
a/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
new file mode 100644
index 0000000..20aaf20
--- /dev/null
+++
b/phoenix-hbase-compat-2.1.6/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+
+import java.io.IOException;
+
+public class CompatBaseScannerRegionObserver implements RegionObserver {
+
+ public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+ "phoenix.max.lookback.age.seconds";
+ public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
+ public void
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+ ScanType scanType, ScanOptions
options, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws
IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
implement the "max
+ //lookback age" feature
+ }
+
+ public void
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+ ScanOptions options,
FlushLifeCycleTracker tracker) throws IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
implement the "max
+ //lookback age" feature
+ }
+
+ public void preMemStoreCompactionCompactScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options)
+ throws IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
implement the "max
+ //lookback age" feature
+ }
+
+ public void
preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store
store,
+ ScanOptions options) throws
IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
override the scan
+ //to "look behind" delete markers on SCN queries
+ }
+
+ public long getMaxLookbackInMillis(Configuration conf){
+ //config param is in seconds, switch to millis
+ return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+ }
+
+ //max lookback age isn't supported in HBase 2.1 or HBase 2.2
+ public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+ return false;
+ }
+
+ public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+ return false;
+ }
+
+}
diff --git
a/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
new file mode 100644
index 0000000..629946b
--- /dev/null
+++
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class HbaseCompatCapabilities {
+
+ public static boolean isMaxLookbackTimeSupported() {
+ return false;
+ }
+
+ //In HBase 2.1 and 2.2, a lookback query won't return any results if
covered by a future delete
+ public static boolean isLookbackBeyondDeletesSupported() { return false; }
+
+}
diff --git
a/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
new file mode 100644
index 0000000..00c2501
--- /dev/null
+++
b/phoenix-hbase-compat-2.2.1/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+
+import java.io.IOException;
+
+public class CompatBaseScannerRegionObserver implements RegionObserver {
+
+ public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+ "phoenix.max.lookback.age.seconds";
+ public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
+ public void
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+ ScanType scanType, ScanOptions options,
CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws
IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
implement the "max
+ //lookback age" feature
+ }
+
+ public void
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+ ScanOptions options, FlushLifeCycleTracker
tracker) throws IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
implement the "max
+ //lookback age" feature
+ }
+
+ public void preMemStoreCompactionCompactScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options)
+ throws IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
implement the "max
+ //lookback age" feature
+ }
+
+ public void
preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store
store,
+ ScanOptions options) throws IOException {
+ //no-op because HBASE-24321 isn't present in HBase 2.1.x, so we can't
override the scan
+ //to "look behind" delete markers on SCN queries
+ }
+
+ public long getMaxLookbackInMillis(Configuration conf){
+ //config param is in seconds, switch to millis
+ return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+ }
+
+ //max lookback age isn't supported in HBase 2.1 or HBase 2.2
+ public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+ return false;
+ }
+
+ public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+ return false;
+ }
+
+}
diff --git
a/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
new file mode 100644
index 0000000..9b83e6d
--- /dev/null
+++
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase;
+
+public class HbaseCompatCapabilities {
+
+ public static boolean isMaxLookbackTimeSupported() {
+ return true;
+ }
+
+ //In HBase 2.1 and 2.2, a lookback query won't return any results if
covered by a future delete,
+ //but in 2.3 and later we have the preSoreScannerOpen hook that overrides
that behavior
+ public static boolean isLookbackBeyondDeletesSupported() { return true; }
+
+}
diff --git
a/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
new file mode 100644
index 0000000..cd1a7f5
--- /dev/null
+++
b/phoenix-hbase-compat-2.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatBaseScannerRegionObserver.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+
+public class CompatBaseScannerRegionObserver implements RegionObserver {
+
+ public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+ "phoenix.max.lookback.age.seconds";
+ public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
+ public void
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+ ScanType scanType, ScanOptions
options, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws
IOException {
+ Configuration conf = c.getEnvironment().getConfiguration();
+ if (isMaxLookbackTimeEnabled(conf)) {
+ setScanOptionsForFlushesAndCompactions(conf, options, store,
scanType);
+ }
+ }
+
+ public void
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+ ScanOptions options,
FlushLifeCycleTracker tracker) throws IOException {
+ Configuration conf = c.getEnvironment().getConfiguration();
+ if (isMaxLookbackTimeEnabled(conf)) {
+ setScanOptionsForFlushesAndCompactions(conf, options, store,
ScanType.COMPACT_RETAIN_DELETES);
+ }
+ }
+
+ public void preMemStoreCompactionCompactScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options)
+ throws IOException {
+ Configuration conf = c.getEnvironment().getConfiguration();
+ if (isMaxLookbackTimeEnabled(conf)) {
+ MemoryCompactionPolicy inMemPolicy =
+ store.getColumnFamilyDescriptor().getInMemoryCompaction();
+ ScanType scanType;
+ //the eager and adaptive in-memory compaction policies can purge
versions; the others
+ // can't. (Eager always does; adaptive sometimes does)
+ if (inMemPolicy.equals(MemoryCompactionPolicy.EAGER) ||
+ inMemPolicy.equals(MemoryCompactionPolicy.ADAPTIVE)) {
+ scanType = ScanType.COMPACT_DROP_DELETES;
+ } else {
+ scanType = ScanType.COMPACT_RETAIN_DELETES;
+ }
+ setScanOptionsForFlushesAndCompactions(conf, options, store,
scanType);
+ }
+ }
+
+ public void
preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store
store,
+ ScanOptions options) throws
IOException {
+
+ if (!storeFileScanDoesntNeedAlteration(options)) {
+ //PHOENIX-4277 -- When doing a point-in-time (SCN) Scan, HBase by
default will hide
+ // mutations that happen before a delete marker. This overrides
that behavior.
+ options.setMinVersions(options.getMinVersions());
+ KeepDeletedCells keepDeletedCells = KeepDeletedCells.TRUE;
+ if (store.getColumnFamilyDescriptor().getTimeToLive() !=
HConstants.FOREVER) {
+ keepDeletedCells = KeepDeletedCells.TTL;
+ }
+ options.setKeepDeletedCells(keepDeletedCells);
+ }
+ }
+
+ private boolean storeFileScanDoesntNeedAlteration(ScanOptions options) {
+ Scan scan = options.getScan();
+ boolean isRaw = scan.isRaw();
+ //true if keep deleted cells is either TRUE or TTL
+ boolean keepDeletedCells =
options.getKeepDeletedCells().equals(KeepDeletedCells.TRUE) ||
+ options.getKeepDeletedCells().equals(KeepDeletedCells.TTL);
+ boolean timeRangeIsLatest = scan.getTimeRange().getMax() ==
HConstants.LATEST_TIMESTAMP;
+ boolean timestampIsTransactional =
+ isTransactionalTimestamp(scan.getTimeRange().getMax());
+ return isRaw
+ || keepDeletedCells
+ || timeRangeIsLatest
+ || timestampIsTransactional;
+ }
+
+ private boolean isTransactionalTimestamp(long ts) {
+ //have to use the HBase edge manager because the Phoenix one is in
phoenix-core
+ return ts > (long) (EnvironmentEdgeManager.currentTime() * 1.1);
+ }
+
+ /*
+ * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL ,
+ * let delete markers age once lookback age is done.
+ */
+ public KeepDeletedCells getKeepDeletedCells(ScanOptions options, ScanType
scanType) {
+ //if we're doing a minor compaction or flush, always set keep deleted
cells
+ //to true. Otherwise, if keep deleted cells is false or TTL, use
KeepDeletedCells TTL,
+ //where the value of the ttl might be overriden to the max lookback
age elsewhere
+ return (options.getKeepDeletedCells() == KeepDeletedCells.TRUE
+ || scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ?
+ KeepDeletedCells.TRUE : KeepDeletedCells.TTL;
+ }
+
+ /*
+ * if the user set a TTL we should leave MIN_VERSIONS at the default (0 in
most of the cases).
+ * Otherwise the data (1st version) will not be removed after the TTL. If
no TTL, we want
+ * Math.max(maxVersions, minVersions, 1)
+ */
+ public int getMinVersions(ScanOptions options, ColumnFamilyDescriptor
cfDescriptor) {
+ return cfDescriptor.getTimeToLive() != HConstants.FOREVER ?
options.getMinVersions()
+ : Math.max(Math.max(options.getMinVersions(),
+ cfDescriptor.getMaxVersions()),1);
+ }
+
+ /**
+ *
+ * @param conf HBase Configuration
+ * @param columnDescriptor ColumnFamilyDescriptor for the store being
compacted
+ * @param options ScanOptions of overrides to the compaction scan
+ * @return Time to live in milliseconds, based on both HBase TTL and
Phoenix max lookback age
+ */
+ public long getTimeToLiveForCompactions(Configuration conf,
+ ColumnFamilyDescriptor
columnDescriptor,
+ ScanOptions options) {
+ long ttlConfigured = columnDescriptor.getTimeToLive();
+ long ttlInMillis = ttlConfigured * 1000;
+ long maxLookbackTtl = getMaxLookbackInMillis(conf);
+ if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
+ if (ttlConfigured == HConstants.FOREVER
+ && columnDescriptor.getKeepDeletedCells() !=
KeepDeletedCells.TRUE) {
+ // If user configured default TTL(FOREVER) and keep deleted
cells to false or
+ // TTL then to remove unwanted delete markers we should change
ttl to max lookback age
+ ttlInMillis = maxLookbackTtl;
+ } else {
+ //if there is a TTL, use TTL instead of max lookback age.
+ // Max lookback age should be more recent or equal to TTL
+ ttlInMillis = Math.max(ttlInMillis, maxLookbackTtl);
+ }
+ }
+
+ return ttlInMillis;
+ }
+
+ public void setScanOptionsForFlushesAndCompactions(Configuration conf,
+ ScanOptions
options,
+ final Store
store,
+ ScanType type) {
+ ColumnFamilyDescriptor cfDescriptor =
store.getColumnFamilyDescriptor();
+ options.setTTL(getTimeToLiveForCompactions(conf, cfDescriptor,
+ options));
+ options.setKeepDeletedCells(getKeepDeletedCells(options, type));
+ options.setMaxVersions(Integer.MAX_VALUE);
+ options.setMinVersions(getMinVersions(options, cfDescriptor));
+ }
+
+ public static long getMaxLookbackInMillis(Configuration conf){
+ //config param is in seconds, switch to millis
+ return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+ }
+
+ public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+ return
isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
+ }
+
+ public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+ return maxLookbackTime > 0L;
+ }
+
+}