This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push: new 49479d6 PHOENIX-5540 Full row index write at the last write phase for immutable global indexes 49479d6 is described below commit 49479d6f60d66fd7c62ed7bb3b210c6bdbcf434e Author: Gokcen Iskender <gisken...@salesforce.com> AuthorDate: Wed Oct 23 11:12:39 2019 -0700 PHOENIX-5540 Full row index write at the last write phase for immutable global indexes Signed-off-by: Kadir <kozde...@salesforce.com> --- .../phoenix/end2end/index/ImmutableIndexIT.java | 46 ++++++++++++++++++++++ .../org/apache/phoenix/execute/MutationState.java | 16 +++++--- .../phoenix/hbase/index/IndexRegionObserver.java | 2 +- 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 1877e9e..bc8fcce 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; @@ -345,6 +347,9 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { assertEquals(numRows, rs.getInt(1)); assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.VERIFIED_BYTES)); + rs = conn.createStatement().executeQuery("SELECT * FROM " + fullIndexName); + assertTrue(rs.next()); + assertEquals("1", rs.getString(1)); // Now try to fail Phase1 and observe that index state is not DISABLED try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) { @@ -422,6 +427,39 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { } } + @Test + public void testGlobalImmutableIndexUnverifiedOnlyInPhase1() throws Exception { + if (localIndex || transactionProvider != null) { + return; + } + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = "TBL_" + generateUniqueName(); + String indexName = "IND_" + generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + TABLE_NAME = fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl(), props); + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) { + conn.setAutoCommit(true); + createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, 0, null); + + // Now force fail + TestUtil.removeCoprocessor(conn, fullTableName, IndexRegionObserver.class); + TestUtil.addCoprocessor(conn, fullTableName, UpsertFailingRegionObserver.class); + try { + upsertRows(conn, fullTableName, 1); + } catch (Exception e) { + // ignore this since we force the fail + } + + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + + GlobalIndexCheckerIT.checkUnverifiedCellCount(conn, fullIndexName); + } + } + public static class DeleteFailingRegionObserver extends SimpleRegionObserver { @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws @@ -430,6 +468,14 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { } } + public static class UpsertFailingRegionObserver extends SimpleRegionObserver { + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws + IOException { + throw new DoNotRetryIOException(); + } + } + public static boolean verifyRowsForEmptyColValue(Connection conn, String tableName, byte[] valueBytes) throws IOException, SQLException { PTable table = PhoenixRuntime.getTable(conn, tableName); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 434d1f7..6cde447 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -1240,15 +1240,19 @@ public class MutationState implements SQLCloseable { } else if (m instanceof Put) { long timestamp = IndexRegionObserver.getMaxTimestamp(m); - // Phase 1 index mutations are set to unverified - ((Put) m).addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.UNVERIFIED_BYTES); - addToMap(unverifiedIndexMutations, tableInfo, m); + // Phase 1 index mutations are set to unverified. + // Just send empty with Unverified + Put unverifiedPut = new Put(m.getRow()); + unverifiedPut.addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.UNVERIFIED_BYTES); + addToMap(unverifiedIndexMutations, tableInfo, unverifiedPut); // Phase 3 mutations are verified - Put verifiedPut = new Put(m.getRow()); - verifiedPut.addColumn(emptyCF, emptyCQ, timestamp, + // Send entire mutation with verified + // Remove the empty column prepared by Index codec as we need to change its value + IndexRegionObserver.removeEmptyColumn(m, emptyCF, emptyCQ); + ((Put) m).addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.VERIFIED_BYTES); - addToMap(verifiedOrDeletedIndexMutations, tableInfo, verifiedPut); + addToMap(verifiedOrDeletedIndexMutations, tableInfo, m); } else { addToMap(unverifiedIndexMutations, tableInfo, m); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index a41e729..053eb54 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -500,7 +500,7 @@ public class IndexRegionObserver extends BaseRegionObserver { return mutations; } - public void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) { + public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) { List<Cell> cellList = m.getFamilyCellMap().get(emptyCF); if (cellList == null) { return;