PHOENIX-4070 Delete row should mask upserts at same timestamp
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8bc58328 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8bc58328 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8bc58328 Branch: refs/heads/4.x-HBase-1.1 Commit: 8bc58328098bebc46e9f670ce20ec67669be3157 Parents: c538918 Author: James Taylor <[email protected]> Authored: Mon Aug 7 11:41:15 2017 -0700 Committer: James Taylor <[email protected]> Committed: Mon Aug 7 23:05:00 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/OutOfOrderMutationsIT.java | 159 ++++++-- .../end2end/index/MutableIndexFailureIT.java | 27 +- .../end2end/index/PartialIndexRebuilderIT.java | 366 +++++++++++++++++++ .../coprocessor/MetaDataRegionObserver.java | 58 +-- .../apache/phoenix/hbase/index/ValueGetter.java | 10 +- .../hbase/index/covered/LocalTableState.java | 10 +- .../index/covered/data/LazyValueGetter.java | 19 +- .../example/CoveredColumnIndexCodec.java | 5 +- .../filter/ApplyAndFilterDeletesFilter.java | 7 +- .../hbase/index/scanner/EmptyScanner.java | 16 +- .../hbase/index/scanner/ScannerBuilder.java | 23 +- .../hbase/index/util/IndexManagementUtil.java | 4 +- .../apache/phoenix/index/IndexMaintainer.java | 32 +- .../index/PhoenixIndexFailurePolicy.java | 57 +-- .../index/PhoenixTransactionalIndexer.java | 2 +- .../phoenix/schema/tuple/ValueGetterTuple.java | 12 +- .../java/org/apache/phoenix/util/IndexUtil.java | 45 ++- .../index/covered/TestLocalTableState.java | 9 +- .../phoenix/index/IndexMaintainerTest.java | 5 +- .../java/org/apache/phoenix/util/TestUtil.java | 140 +++++++ 20 files changed, 811 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java index 3cf7336..0e038e2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java @@ -91,13 +91,9 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); conn = DriverManager.getConnection(getUrl(), props); - long count1 = getRowCount(conn, tableName); - long count2 = getRowCount(conn, indexName); - assertEquals(0, count1); - assertEquals(0, count2); - conn.close(); - + TestUtil.scutinizeIndex(conn, tableName, indexName); assertNoTimeStampAt(conn, indexName, 1030); + conn.close(); /** * @@ -179,11 +175,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); conn = DriverManager.getConnection(getUrl(), props); - long count1 = getRowCount(conn, tableName); - long count2 = getRowCount(conn, indexName); - assertEquals("Table should have 1 row", 1, count1); - assertEquals("Index should have 1 row", 1, count2); - conn.close(); + TestUtil.scutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); assertTrue(rs.next()); @@ -196,6 +188,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); assertNoTimeStampAt(conn, indexName, 1030); + conn.close(); /** * @@ -272,11 +265,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - long count1 = getRowCount(conn, tableName); - long count2 = getRowCount(conn, indexName); - assertEquals("Table should have 1 row", 1, count1); - assertEquals("Index should have 1 row", 1, count2); - conn.close(); + TestUtil.scutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); assertTrue(rs.next()); @@ -287,6 +276,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(expectedTimestamp, rs.getTimestamp(1)); assertFalse(rs.next()); + conn.close(); } @Test @@ -341,11 +331,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - long count1 = getRowCount(conn, tableName); - long count2 = getRowCount(conn, indexName); - assertEquals("Table should have 1 row", 1, count1); - assertEquals("Index should have 1 row", 1, count2); - conn.close(); + TestUtil.scutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts FROM " + tableName); assertTrue(rs.next()); @@ -356,6 +342,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(expectedTimestamp, rs.getTimestamp(1)); assertFalse(rs.next()); + conn.close(); } @Test @@ -406,11 +393,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - long count1 = getRowCount(conn, tableName); - long count2 = getRowCount(conn, indexName); - assertEquals("Table should have 1 row", 1, count1); - assertEquals("Index should have 1 row", 1, count2); - conn.close(); + TestUtil.scutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); assertTrue(rs.next()); @@ -423,6 +406,8 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { assertEquals(expectedTimestamp, rs.getTimestamp(1)); assertEquals(null, rs.getString(2)); assertFalse(rs.next()); + + conn.close(); } @Test @@ -473,11 +458,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - long count1 = getRowCount(conn, tableName); - long count2 = getRowCount(conn, indexName); - assertEquals("Table should have 1 row", 1, count1); - assertEquals("Index should have 1 row", 1, count2); - conn.close(); + TestUtil.scutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); assertTrue(rs.next()); @@ -490,6 +471,8 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { assertEquals(expectedTimestamp, rs.getTimestamp(1)); assertEquals(null, rs.getString(2)); assertFalse(rs.next()); + + conn.close(); } @Test @@ -540,11 +523,7 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); - long count1 = getRowCount(conn, tableName); - long count2 = getRowCount(conn, indexName); - assertEquals("Table should have 1 row", 1, count1); - assertEquals("Index should have 1 row", 1, count2); - conn.close(); + TestUtil.scutinizeIndex(conn, tableName, indexName); ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ ts,v FROM " + tableName); assertTrue(rs.next()); @@ -557,5 +536,113 @@ public class OutOfOrderMutationsIT extends ParallelStatsDisabledIT { assertEquals(expectedTimestamp, rs.getTimestamp(1)); assertEquals(null, rs.getString(2)); assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testDeleteRowAndUpsertValueAtSameTS1() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, A.V VARCHAR, B.V2 VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts) INCLUDE (V, v2)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, '0','1')"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + Timestamp expectedTimestamp; + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE (K1,K2) = ('aa','aa')"); + stmt.executeUpdate(); + conn.commit(); + expectedTimestamp = new Timestamp(3000L); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null,'3')"); + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long rowCount = TestUtil.scutinizeIndex(conn, tableName, indexName); + assertEquals(0,rowCount); + + conn.close(); + } + + @Test + public void testDeleteRowAndUpsertValueAtSameTS2() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + long ts = 1000; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 CHAR(2) NOT NULL, k2 CHAR(2) NOT NULL, ts TIMESTAMP, V VARCHAR, V2 VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.close(); + + ts = 1010; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(k2,k1,ts) INCLUDE (V, v2)"); + conn.close(); + + ts = 1020; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, '0')"); + stmt.setTimestamp(1, new Timestamp(1000L)); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + Timestamp expectedTimestamp; + ts = 1040; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + expectedTimestamp = new Timestamp(3000L); + stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES('aa','aa',?, null)"); + stmt.setTimestamp(1, expectedTimestamp); + stmt.executeUpdate(); + conn.commit(); + stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE (K1,K2) = ('aa','aa')"); + stmt.executeUpdate(); + conn.commit(); + conn.close(); + + ts = 1050; + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName))); + + long rowCount = TestUtil.scutinizeIndex(conn, tableName, indexName); + assertEquals(0,rowCount); + + conn.close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index c07abb4..855bd75 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -48,7 +48,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -63,6 +62,7 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -326,31 +326,12 @@ public class MutableIndexFailureIT extends BaseTest { } private void waitForIndexRebuild(Connection conn, String index, PIndexState expectedIndexState) throws InterruptedException, SQLException { - boolean isActive = false; if (!transactional) { - int maxTries = 12, nTries = 0; - do { - Thread.sleep(5 * 1000); // sleep 5 secs - String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " + - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME - + ") = (" + "'" + schema + "','" + index + "') " - + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" - + " AND " + PhoenixDatabaseMetaData.INDEX_STATE + " = '" + expectedIndexState.getSerializedValue() + "'"; - ResultSet rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - if (expectedIndexState == PIndexState.ACTIVE) { - if (rs.getLong(1) == 0 && !rs.wasNull()) { - isActive = true; - break; - } - } - } while (++nTries < maxTries); - if (expectedIndexState == PIndexState.ACTIVE) { - assertTrue(isActive); - } + String fullIndexName = SchemaUtil.getTableName(schema, index); + TestUtil.waitForIndexRebuild(conn, fullIndexName, expectedIndexState); } } - + private void initializeTable(Connection conn, String tableName) throws SQLException { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); stmt.setString(1, "a"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java new file mode 100644 index 0000000..ef9ae1b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000"); + serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "30000"); // give up rebuilding after 30 seconds + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); + } + + + @Test + public void testRowCountIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn.commit(); + + int count = conn.createStatement().executeUpdate("DELETE FROM " + fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'"); + assertEquals(1,count); + conn.commit(); + try { + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected data table row count to match expected:<1> but was:<2>"); + } + } + } + @Test + public void testExtraRowIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); + conn.commit(); + + conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('bbb','x','0')"); + conn.commit(); + try { + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')"); + } + } + } + + @Test + public void testValuetIndexScrutiny() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); + conn.commit(); + + conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')"); + conn.commit(); + try { + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + fail(); + } catch (AssertionError e) { + assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'"); + } + } + } + + @Test + public void testMultiVersionsAfterFailure() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn.commit(); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd')"); + conn.commit(); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')"); + conn.commit(); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } + + @Test + public void testUpsertNullAfterFailure() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); + conn.commit(); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); + conn.commit(); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn.commit(); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } + + @Test + public void testUpsertNullTwiceAfterFailure() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); + conn.commit(); + conn.createStatement().execute("DELETE FROM " + fullTableName); + conn.commit(); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); + conn.commit(); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } + + @Test + public void testDeleteAfterFailure() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')"); + conn.commit(); + conn.createStatement().execute("DELETE FROM " + fullTableName); + conn.commit(); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } + + @Test + public void testDeleteBeforeFailure() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)"); + conn.commit(); + conn.createStatement().execute("DELETE FROM " + fullTableName); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')"); + conn.commit(); + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } + + @Test + public void testMultiValuesAtSameTS() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(disableTS)); + try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) { + conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','bb')"); + conn2.commit(); + conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn2.commit(); + } + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } + + @Test + public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(disableTS)); + try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) { + conn2.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'"); + conn2.commit(); + conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn2.commit(); + } + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } + + @Test + public void testDeleteAndUpsertValuesAtSameTS2() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a')"); + conn.commit(); + long disableTS = EnvironmentEdgeManager.currentTimeMillis(); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(disableTS)); + try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) { + conn2.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')"); + conn2.commit(); + conn2.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'"); + conn2.commit(); + } + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName))); + TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName))); + + TestUtil.scutinizeIndex(conn, fullTableName, fullIndexName); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 5cfacfc..9b68cd4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -59,10 +58,6 @@ import org.apache.log4j.Logger; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.compile.PostDDLCompiler; -import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; -import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; -import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.IndexMaintainer; @@ -77,11 +72,10 @@ import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; @@ -90,10 +84,8 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.UpgradeUtil; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.protobuf.ServiceException; /** @@ -326,8 +318,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { * rebuild task won't pick up this index again for rebuild. */ try { - updateIndexState(conn, indexTableFullName, env, state, - PIndexState.DISABLE, 0l); + IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, 0l); LOG.error("Unable to rebuild index " + indexTableFullName + ". Won't attempt again since index disable timestamp is older than current time by " + indexDisableTimestampThreshold @@ -342,7 +333,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // Allow index to begin incremental maintenance as index is back online and we // cannot transition directly from DISABLED -> ACTIVE if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) { - updateIndexState(conn, indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE, null); + IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.INACTIVE, null); } List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable); if (indexesToPartiallyRebuild == null) { @@ -409,7 +400,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { long scanEndTime = getTimestampForBatch(timeStamp, batchExecutedPerTableMap.get(dataPTable.getName())); - + // We can't allow partial results + dataTableScan.setAllowPartialResults(false); dataTableScan.setTimeRange(timeStamp, scanEndTime); dataTableScan.setCacheBlocks(false); dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); @@ -436,8 +428,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()); if (scanEndTime == HConstants.LATEST_TIMESTAMP) { - updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, - PIndexState.ACTIVE, 0l); + IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, 0l); batchExecutedPerTableMap.remove(dataPTable.getName()); LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding"); } else { @@ -499,43 +490,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } } - private static void updateIndexState(PhoenixConnection conn, String indexTableName, - RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState, Long indexDisableTimestamp) - throws ServiceException, Throwable { - if (newState == PIndexState.ACTIVE) { - Preconditions.checkArgument(indexDisableTimestamp == 0, - "Index disable timestamp has to be 0 when marking an index as active"); - } - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); - String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); - String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); - // Mimic the Put that gets generated by the client on an update of the - // index state - Put put = new Put(indexTableKey); - put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - newState.getSerializedBytes()); - if (indexDisableTimestamp != null) { - put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, - PLong.INSTANCE.toBytes(indexDisableTimestamp)); - } - if (newState == PIndexState.ACTIVE) { - put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); - } - final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); - MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null); - MutationCode code = result.getMutationCode(); - if (code == MutationCode.TABLE_NOT_FOUND) { - throw new TableNotFoundException(schemaName, indexName); - } - if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION) - .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName) - .setTableName(indexName).build().buildException(); - } - } - private static void updateDisableTimestamp(PhoenixConnection conn, String indexTableName, RegionCoprocessorEnvironment env, long disabledTimestamp, HTableInterface metaTable) throws IOException { byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java index af847b7..8c75424 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java @@ -23,16 +23,18 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; public interface ValueGetter { - + public static final ImmutableBytesWritable HIDDEN_BY_DELETE = new ImmutableBytesWritable(new byte[0]); /** * Get the most recent (largest timestamp) for the given column reference * @param ref to match against an underlying key value. Uses the passed object to match the * keyValue via {@link ColumnReference#matches} - * @return the stored value for the given {@link ColumnReference}, or <tt>null</tt> if no value is - * present. + * @param ts time stamp at which mutations will be issued + * @return the stored value for the given {@link ColumnReference}, <tt>null</tt> if no value is + * present, or {@link ValueGetter#HIDDEN_BY_DELETE} if no value is present and the ref + * will be shadowed by a delete marker. * @throws IOException if there is an error accessing the underlying data storage */ - public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException; + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException; public byte[] getRowKey(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index acbf1ab..0f5a9f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -30,8 +30,8 @@ import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; -import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; /** @@ -163,7 +163,7 @@ public class LocalTableState implements TableState { * {@link IndexUpdate}. * @throws IOException */ - public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( + public Pair<CoveredDeleteScanner, IndexUpdate> getIndexedColumnsTableState( Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) throws IOException { ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations, indexMetaData); // filter out things with a newer timestamp and track the column references to which it applies @@ -175,9 +175,9 @@ public class LocalTableState implements TableState { } } - Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts, returnNullScannerIfRowNotFound); + CoveredDeleteScanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts, returnNullScannerIfRowNotFound); - return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); + return new Pair<CoveredDeleteScanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); } /** @@ -266,7 +266,7 @@ public class LocalTableState implements TableState { @Override public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) throws IOException { - Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, returnNullScannerIfRowNotFound, indexMetaData); + Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, returnNullScannerIfRowNotFound, indexMetaData); ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey()); return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java index 52076a2..bafefce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java @@ -24,11 +24,12 @@ import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** @@ -37,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; */ public class LazyValueGetter implements ValueGetter { - private Scanner scan; + private CoveredDeleteScanner scan; private volatile Map<ColumnReference, ImmutableBytesWritable> values; private byte[] row; @@ -46,13 +47,13 @@ public class LazyValueGetter implements ValueGetter { * @param scan backing scanner * @param currentRow row key for the row to seek in the scanner */ - public LazyValueGetter(Scanner scan, byte[] currentRow) { + public LazyValueGetter(CoveredDeleteScanner scan, byte[] currentRow) { this.scan = scan; this.row = currentRow; } @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException { + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { // ensure we have a backing map if (values == null) { synchronized (this) { @@ -64,6 +65,14 @@ public class LazyValueGetter implements ValueGetter { ImmutableBytesWritable value = values.get(ref); if (value == null) { value = get(ref); + DeleteTracker deleteTracker = scan.getDeleteTracker(); + if (value == null) { + // Delete family is used for row deletion. Family won't necessarily match as we'll be at + // the delete family marker on the last column family if there is one. + if (deleteTracker.deleteFamily != null && deleteTracker.deleteFamily.getTimestamp() == ts) { + value = HIDDEN_BY_DELETE; + } + } values.put(ref, value); } @@ -81,7 +90,7 @@ public class LazyValueGetter implements ValueGetter { } // there is a next value - we only care about the current value, so we can just snag that Cell next = scan.next(); - if (ref.matches(KeyValueUtil.ensureKeyValue(next))) { + if (ref.matches(next)) { return new ImmutableBytesPtr(next.getValueArray(), next.getValueOffset(), next.getValueLength()); } return null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 1392906..c24d730 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -30,6 +30,7 @@ import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.scanner.Scanner; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; import com.google.common.collect.Lists; @@ -77,7 +78,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) { List<CoveredColumn> refs = group.getColumns(); try { - Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); + Pair<CoveredDeleteScanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); Scanner kvs = stateInfo.getFirst(); Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey()); // make sure we close the scanner @@ -132,7 +133,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) { List<CoveredColumn> refs = group.getColumns(); try { - Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); + Pair<CoveredDeleteScanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData); Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); // make sure we close the scanner reference kvs.getFirst().close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java index 1f66e7c..a1f01ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java @@ -70,6 +70,9 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { Collections.sort(this.families); } + public DeleteTracker getDeleteTracker() { + return coveringDelete; + } private ImmutableBytesPtr getNextFamily(ImmutableBytesPtr family) { int index = Collections.binarySearch(families, family); @@ -209,7 +212,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { } } - class DeleteTracker { + public static class DeleteTracker { public KeyValue deleteFamily; public KeyValue deleteColumn; @@ -283,7 +286,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { return true; } // clear the point delete since the TS must not be matching - coveringDelete.pointDelete = null; + pointDelete = null; } return false; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java index 884cca6..1c36ebb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java @@ -21,13 +21,20 @@ package org.apache.phoenix.hbase.index.scanner; import java.io.IOException; import org.apache.hadoop.hbase.Cell; +import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; /** * {@link Scanner} that has no underlying data */ -public class EmptyScanner implements Scanner { - +public class EmptyScanner implements CoveredDeleteScanner { + private final DeleteTracker deleteTracker; + + public EmptyScanner (DeleteTracker deleteTracker) { + this.deleteTracker = deleteTracker; + } + @Override public Cell next() throws IOException { return null; @@ -47,4 +54,9 @@ public class EmptyScanner implements Scanner { public void close() throws IOException { // noop } + + @Override + public DeleteTracker getDeleteTracker() { + return deleteTracker; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java index 301929c..5547958 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.covered.KeyValueStore; import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter; +import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker; import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; @@ -57,7 +58,7 @@ public class ScannerBuilder { this.update = update; } - public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts, boolean returnNullIfRowNotFound) { + public CoveredDeleteScanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts, boolean returnNullIfRowNotFound) { Filter columnFilters = getColumnFilters(indexedColumns); FilterList filters = new FilterList(Lists.newArrayList(columnFilters)); @@ -68,10 +69,11 @@ public class ScannerBuilder { filters.addFilter(new ColumnTrackingNextLargestTimestampFilter(ts, tracker)); // filter out kvs based on deletes - filters.addFilter(new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns))); + ApplyAndFilterDeletesFilter deleteFilter = new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns)); + filters.addFilter(deleteFilter); // combine the family filters and the rest of the filters as a - return getFilteredScanner(filters, returnNullIfRowNotFound); + return getFilteredScanner(filters, returnNullIfRowNotFound, deleteFilter.getDeleteTracker()); } /** @@ -108,14 +110,18 @@ public class ScannerBuilder { return families; } - private Scanner getFilteredScanner(Filter filters, boolean returnNullIfRowNotFound) { + public static interface CoveredDeleteScanner extends Scanner { + public DeleteTracker getDeleteTracker(); + } + + private CoveredDeleteScanner getFilteredScanner(Filter filters, boolean returnNullIfRowNotFound, final DeleteTracker deleteTracker) { // create a scanner and wrap it as an iterator, meaning you can only go forward final FilteredKeyValueScanner kvScanner = new FilteredKeyValueScanner(filters, memstore); // seek the scanner to initialize it KeyValue start = KeyValueUtil.createFirstOnRow(update.getRow()); try { if (!kvScanner.seek(start)) { - return returnNullIfRowNotFound ? null : new EmptyScanner(); + return returnNullIfRowNotFound ? null : new EmptyScanner(deleteTracker); } } catch (IOException e) { // This should never happen - everything should explode if so. @@ -124,7 +130,7 @@ public class ScannerBuilder { } // we have some info in the scanner, so wrap it in an iterator and return. - return new Scanner() { + return new CoveredDeleteScanner() { @Override public Cell next() { @@ -162,6 +168,11 @@ public class ScannerBuilder { public void close() throws IOException { kvScanner.close(); } + + @Override + public DeleteTracker getDeleteTracker() { + return deleteTracker; + } }; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java index a60adef..6582c8a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java @@ -32,7 +32,7 @@ import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException; import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.scanner.Scanner; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; /** * Utility class to help manage indexes @@ -97,7 +97,7 @@ public class IndexManagementUtil { } - public static ValueGetter createGetterFromScanner(Scanner scanner, byte[] currentRow) { + public static ValueGetter createGetterFromScanner(CoveredDeleteScanner scanner, byte[] currentRow) { return scanner!=null ? new LazyValueGetter(scanner, currentRow) : null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 3b4faa9..840d535 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -548,7 +548,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { initCachedState(); } - public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey) { + public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); boolean prependRegionStartKey = isLocalIndex && regionStartKey != null; boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0; @@ -620,7 +620,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { dataColumnType = expression.getDataType(); dataSortOrder = expression.getSortOrder(); isNullable = expression.isNullable(); - expression.evaluate(new ValueGetterTuple(valueGetter), ptr); + expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr); } else { Field field = dataRowKeySchema.getField(dataPkPosition[i]); @@ -932,10 +932,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, ts); Put put = null; // New row being inserted: add the empty key value - if (valueGetter==null || valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) { + ImmutableBytesWritable latestValue = null; + if (valueGetter==null || (latestValue = valueGetter.getLatestValue(dataEmptyKeyValueRef, ts)) == null || latestValue == ValueGetter.HIDDEN_BY_DELETE) { + // We need to track whether or not our empty key value is hidden by a Delete Family marker at the same timestamp. + // If it is, these Puts will be masked so should not be emitted. + if (latestValue == ValueGetter.HIDDEN_BY_DELETE) { + return null; + } put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), @@ -997,7 +1003,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme); ImmutableBytesPtr ptr = new ImmutableBytesPtr(); - expression.evaluate(new ValueGetterTuple(valueGetter), ptr); + expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr); byte[] value = ptr.copyBytesIfNecessary(); if (value != null) { int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1; @@ -1023,8 +1029,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { ColumnReference indexColRef = this.coveredColumnsMap.get(ref); ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); - ImmutableBytesWritable value = valueGetter.getLatestValue(ref); - if (value != null) { + ImmutableBytesWritable value = valueGetter.getLatestValue(ref, ts); + if (value != null && value != ValueGetter.HIDDEN_BY_DELETE) { if (put == null) { put = new Put(indexRowKey); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); @@ -1068,7 +1074,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return getDeleteTypeOrNull(pendingUpdates) != null; } - private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates) throws IOException { + private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates, long ts) throws IOException { if (pendingUpdates.isEmpty()) { return false; } @@ -1079,8 +1085,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (ColumnReference ref : indexedColumns) { Cell newValue = newState.get(ref); if (newValue != null) { // Indexed column has potentially changed - ImmutableBytesWritable oldValue = oldState.getLatestValue(ref); - boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY)); + ImmutableBytesWritable oldValue = oldState.getLatestValue(ref, ts); + boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY)); boolean oldValueSetAsNull = oldValue == null || oldValue.getLength() == 0; //If the new column value has to be set as null and the older value is null too, //then just skip to the next indexed column. @@ -1109,10 +1115,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { - byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); + byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey, ts); // Delete the entire row if any of the indexed columns changed DeleteType deleteType = null; - if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row + if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates, ts)) { // Deleting the entire row byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary(); Delete delete = new Delete(indexRowKey); @@ -1774,7 +1780,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return new ValueGetter() { @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref) { + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { if(ref.equals(dataEmptyKeyValueRef)) return null; return valueMap.get(ref); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 82360f7..c91e36e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -216,35 +216,36 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { minTimeStamp *= -1; } // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor. - HTableInterface systemTable = env.getTable(SchemaUtil - .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration())); - MetaDataMutationResult result = IndexUtil.setIndexDisableTimeStamp(indexTableName, minTimeStamp, - systemTable, newState); - if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) { - LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations"); - continue; - } - if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { - if (leaveIndexActive) { - LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = " - + result.getMutationCode()); - // If we're not disabling the index, then we don't want to throw as throwing - // will lead to the RS being shutdown. - if (blockDataTableWritesOnFailure) { - throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed."); - } - } else { - LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " - + result.getMutationCode() + ". Will use default failure policy instead."); - throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); - } + try (HTableInterface systemTable = env.getTable(SchemaUtil + .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) { + MetaDataMutationResult result = IndexUtil.updateIndexState(indexTableName, minTimeStamp, + systemTable, newState); + if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) { + LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations"); + continue; + } + if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { + if (leaveIndexActive) { + LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = " + + result.getMutationCode()); + // If we're not disabling the index, then we don't want to throw as throwing + // will lead to the RS being shutdown. + if (blockDataTableWritesOnFailure) { + throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed."); + } + } else { + LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + + result.getMutationCode() + ". Will use default failure policy instead."); + throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); + } + } + if (leaveIndexActive) + LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.", + cause); + else + LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", + cause); } - if (leaveIndexActive) - LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.", - cause); - else - LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", - cause); } // Return the cell time stamp (note they should all be the same) return timestamp; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index d486a74..4b267a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -615,7 +615,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { ValueGetter getter = new ValueGetter() { @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException { + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { return valueMap.get(ref); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java index 1f271f4..728b1e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java @@ -32,13 +32,17 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference; * Class used to construct a {@link Tuple} in order to evaluate an {@link Expression} */ public class ValueGetterTuple extends BaseTuple { - private ValueGetter valueGetter; + private final ValueGetter valueGetter; + private final long ts; - public ValueGetterTuple(ValueGetter valueGetter) { + public ValueGetterTuple(ValueGetter valueGetter, long ts) { this.valueGetter = valueGetter; + this.ts = ts; } public ValueGetterTuple() { + this.valueGetter = null; + this.ts = HConstants.LATEST_TIMESTAMP; } @Override @@ -55,7 +59,7 @@ public class ValueGetterTuple extends BaseTuple { public KeyValue getValue(byte[] family, byte[] qualifier) { ImmutableBytesWritable value = null; try { - value = valueGetter.getLatestValue(new ColumnReference(family, qualifier)); + value = valueGetter.getLatestValue(new ColumnReference(family, qualifier), ts); } catch (IOException e) { throw new RuntimeException(e); } @@ -92,7 +96,7 @@ public class ValueGetterTuple extends BaseTuple { KeyValue kv = getValue(family, qualifier); if (kv == null) return false; - ptr.set(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); return true; }
