Repository: phoenix Updated Branches: refs/heads/5.x-HBase-2.0 aeb33b9fb -> df98ad3f3
PHOENIX-4531 Delete on a table with a global mutable index can issue client-side deletes against the index(Vincent Poon) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/df98ad3f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/df98ad3f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/df98ad3f Branch: refs/heads/5.x-HBase-2.0 Commit: df98ad3f3ec2343749cc6e749a673bcba928aa79 Parents: aeb33b9 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Tue Feb 20 17:11:00 2018 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Tue Feb 20 17:11:00 2018 +0530 ---------------------------------------------------------------------- .../phoenix/end2end/index/BaseIndexIT.java | 20 ++++++ .../end2end/index/PartialIndexRebuilderIT.java | 48 ++++++++++++++- .../apache/phoenix/compile/DeleteCompiler.java | 65 +++++++++++++++----- .../apache/phoenix/optimize/QueryOptimizer.java | 13 ++-- .../phoenix/compile/QueryOptimizerTest.java | 41 ++++++++++++ 5 files changed, 165 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/df98ad3f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java index 3fd6b3b..c2f00e7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java @@ -37,6 +37,8 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.Random; @@ -51,6 +53,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; @@ -68,6 +72,7 @@ import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -202,6 +207,7 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4"; assertEquals(1,conn.createStatement().executeUpdate(dml)); + assertNoClientSideIndexMutations(conn); conn.commit(); String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + fullTableName; @@ -232,6 +238,19 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { } } + private void assertNoClientSideIndexMutations(Connection conn) throws SQLException { + if (mutable) { + Iterator<Pair<byte[],List<Cell>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn); + if (iterator.hasNext()) { + byte[] tableName = iterator.next().getFirst(); // skip data table mutations + PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName)); + assertTrue(table.getType() == PTableType.TABLE); // should be data table + boolean hasIndexData = iterator.hasNext(); + assertFalse(hasIndexData); // should have no index data + } + } + } + @Test public void testCreateIndexAfterUpsertStarted() throws Exception { testCreateIndexAfterUpsertStarted(false, @@ -367,6 +386,7 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4"; assertEquals(1,conn.createStatement().executeUpdate(dml)); + assertNoClientSideIndexMutations(conn); conn.commit(); // query the data table http://git-wip-us.apache.org/repos/asf/phoenix/blob/df98ad3f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 767bb0a..63d426f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -1028,7 +1029,52 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { assertTrue(MetaDataUtil.tableRegionsOnline(conf, table)); } } - + + //Tests that when we're updating an index from within the RS (e.g. UngruopedAggregateRegionObserver), + // if the index write fails the index gets disabled + @Test + public void testIndexFailureWithinRSDisablesIndex() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + try { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) DISABLE_INDEX_ON_WRITE_FAILURE = TRUE"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0', 't')"); + conn.commit(); + // Simulate write failure + TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + conn.setAutoCommit(true); + try { + conn.createStatement().execute("DELETE FROM " + fullTableName); + fail(); + } catch (CommitException|PhoenixIOException e) { + // Expected + } + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null)); + // reset the index state to ACTIVE + Table metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.INACTIVE); + IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.ACTIVE); + TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0', 't')"); + TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + try { + conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE v1='a'"); + fail(); + } catch (CommitException|PhoenixIOException e) { + // Expected + } + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null)); + } finally { + TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + } + } + } + public static class WriteFailingRegionObserver extends SimpleRegionObserver { @Override public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/df98ad3f/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index a635c69..6224570 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -106,6 +106,23 @@ public class DeleteCompiler { this.operation = operation; } + /** + * Handles client side deletion of rows for a DELETE statement. We determine the "best" plan to drive the query using + * our standard optimizer. The plan may be based on using an index, in which case we need to translate the index row + * key to get the data row key used to form the delete mutation. We always collect up the data table mutations, but we + * only collect and send the index mutations for global, immutable indexes. Local indexes and mutable indexes are always + * maintained on the server side. + * @param context StatementContext for the scan being executed + * @param iterator ResultIterator for the scan being executed + * @param bestPlan QueryPlan used to produce the iterator + * @param projectedTableRef TableRef containing all indexed and covered columns across all indexes on the data table + * @param otherTableRefs other TableRefs needed to be maintained apart from the one over which the scan is executing. + * Might be other index tables (if we're driving off of the data table table), the data table (if we're driving off of + * an index table), or a mix of the data table and additional index tables. + * @return MutationState representing the uncommitted data across the data table and indexes. Will be joined with the + * MutationState on the connection over which the delete is occurring. + * @throws SQLException + */ private static MutationState deleteRows(StatementContext context, ResultIterator iterator, QueryPlan bestPlan, TableRef projectedTableRef, List<TableRef> otherTableRefs) throws SQLException { RowProjector projector = bestPlan.getProjector(); TableRef tableRef = bestPlan.getTableRef(); @@ -123,13 +140,14 @@ public class DeleteCompiler { final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); MultiRowMutationState mutations = new MultiRowMutationState(batchSize); - List<MultiRowMutationState> indexMutations = null; - // If indexTableRef is set, we're deleting the rows from both the index table and - // the data table through a single query to save executing an additional one. + List<MultiRowMutationState> otherMutations = null; + // If otherTableRefs is not empty, we're deleting the rows from both the index table and + // the data table through a single query to save executing an additional one (since we + // can always get the data table row key from an index row key). if (!otherTableRefs.isEmpty()) { - indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size()); + otherMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size()); for (int i = 0; i < otherTableRefs.size(); i++) { - indexMutations.add(new MultiRowMutationState(batchSize)); + otherMutations.add(new MultiRowMutationState(batchSize)); } } List<PColumn> pkColumns = table.getPKColumns(); @@ -207,22 +225,22 @@ public class DeleteCompiler { // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the // row key will already have its value. // Check for otherTableRefs being empty required when deleting directly from the index - if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) { + if (otherTableRefs.isEmpty() || (table.getIndexType() != IndexType.LOCAL && table.isImmutableRows())) { mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } for (int i = 0; i < otherTableRefs.size(); i++) { PTable otherTable = otherTableRefs.get(i).getTable(); - ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + ImmutableBytesPtr otherRowKeyPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map // Translate the data table row to the index table row if (table.getType() == PTableType.INDEX) { - indexPtr.set(scannedIndexMaintainer.buildDataRowKey(rowKeyPtr, viewConstants)); + otherRowKeyPtr.set(scannedIndexMaintainer.buildDataRowKey(rowKeyPtr, viewConstants)); if (otherTable.getType() == PTableType.INDEX) { - indexPtr.set(maintainers[i].buildRowKey(getter, indexPtr, null, null, HConstants.LATEST_TIMESTAMP)); + otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, otherRowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); } } else { - indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); + otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); } - indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + otherMutations.get(i).put(otherRowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -250,7 +268,7 @@ public class DeleteCompiler { int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0; MutationState state = new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection); for (int i = 0; i < otherTableRefs.size(); i++) { - MutationState indexState = new MutationState(otherTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection); + MutationState indexState = new MutationState(otherTableRefs.get(i), otherMutations.get(i), 0, maxSize, maxSizeBytes, connection); state.join(indexState); } return state; @@ -877,6 +895,8 @@ public class DeleteCompiler { public MutationState execute() throws SQLException { ResultIterator iterator = bestPlan.iterator(); try { + // If we're not doing any pre or post processing, we can produce the delete mutations directly + // in the parallel threads executed for the scan if (!hasPreOrPostProcessing) { Tuple tuple; long totalRowCount = 0; @@ -891,16 +911,29 @@ public class DeleteCompiler { } // Return total number of rows that have been deleted from the table. In the case of auto commit being off // the mutations will all be in the mutation state of the current connection. We need to divide by the - // total number of tables we updated as otherwise the client will get an unexpected result - MutationState state = new MutationState(maxSize, maxSizeBytes, connection, - totalRowCount / - ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size())); + // total number of tables we updated as otherwise the client will get an inflated result. + int totalTablesUpdateClientSide = 1; // data table is always updated + PTable bestTable = bestPlan.getTableRef().getTable(); + // global immutable tables are also updated client side (but don't double count the data table) + if (bestPlan != dataPlan && bestTable.getIndexType() == IndexType.GLOBAL && bestTable.isImmutableRows()) { + totalTablesUpdateClientSide++; + } + for (TableRef otherTableRef : otherTableRefs) { + PTable otherTable = otherTableRef.getTable(); + // Don't double count the data table here (which morphs when it becomes a projected table, hence this check) + if (projectedTableRef != otherTableRef && otherTable.getIndexType() == IndexType.GLOBAL && otherTable.isImmutableRows()) { + totalTablesUpdateClientSide++; + } + } + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount/totalTablesUpdateClientSide); // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. state.setReadMetricQueue(context.getReadMetricsQueue()); return state; } else { + // Otherwise, we have to execute the query and produce the delete mutations in the single thread + // producing the query results. return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs); } } finally { http://git-wip-us.apache.org/repos/asf/phoenix/blob/df98ad3f/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 1a57ecf..5cc415d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -394,7 +394,8 @@ public class QueryOptimizer { } } final int boundRanges = nViewConstants; - final int comparisonOfDataVersusIndexTable = select.getHint().hasHint(Hint.USE_DATA_OVER_INDEX_TABLE) ? -1 : 1; + final boolean useDataOverIndexHint = select.getHint().hasHint(Hint.USE_DATA_OVER_INDEX_TABLE); + final int comparisonOfDataVersusIndexTable = useDataOverIndexHint ? -1 : 1; Collections.sort(bestCandidates, new Comparator<QueryPlan>() { @Override @@ -415,8 +416,10 @@ public class QueryOptimizer { } } // Use smaller table (table with fewest kv columns) - c = (table1.getColumns().size() - table1.getPKColumns().size()) - (table2.getColumns().size() - table2.getPKColumns().size()); - if (c != 0) return c; + if (!useDataOverIndexHint || (table1.getType() == PTableType.INDEX && table2.getType() == PTableType.INDEX)) { + c = (table1.getColumns().size() - table1.getPKColumns().size()) - (table2.getColumns().size() - table2.getPKColumns().size()); + if (c != 0) return c; + } // If all things are equal, don't choose local index as it forces scan // on every region (unless there's no start/stop key) @@ -433,10 +436,10 @@ public class QueryOptimizer { // All things being equal, just use the table based on the Hint.USE_DATA_OVER_INDEX_TABLE if (table1.getType() == PTableType.INDEX && table2.getType() != PTableType.INDEX) { - return comparisonOfDataVersusIndexTable; + return -comparisonOfDataVersusIndexTable; } if (table2.getType() == PTableType.INDEX && table1.getType() != PTableType.INDEX) { - return -comparisonOfDataVersusIndexTable; + return comparisonOfDataVersusIndexTable; } return 0; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/df98ad3f/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java index e1dacb7..56fd178 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java @@ -45,6 +45,9 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SQLParser; +import org.apache.phoenix.parse.DeleteStatement; +import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; @@ -338,6 +341,44 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest { } @Test + public void testDataTableOverIndexHint() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + conn.createStatement().execute("CREATE INDEX idx ON t(v1,v2)"); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = stmt.optimizeQuery("SELECT /*+ " + Hint.USE_DATA_OVER_INDEX_TABLE + " */ * FROM t"); + assertEquals("T", plan.getTableRef().getTable().getTableName().getString()); + // unhinted still uses index + plan = stmt.optimizeQuery("SELECT * FROM t"); + assertEquals("IDX", plan.getTableRef().getTable().getTableName().getString()); + // hinting with a WHERE clause still uses the index + plan = stmt.optimizeQuery("SELECT /*+ " + Hint.USE_DATA_OVER_INDEX_TABLE + " */ * FROM t WHERE v1 = 'foo'"); + assertEquals("IDX", plan.getTableRef().getTable().getTableName().getString()); + } + + // Tests that a DELETE without a WHERE clause uses the data table (for parallel deletion on server side) + // DELETE with a WHERE clause should use the index on the client side + @Test + public void testDelete() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + conn.createStatement().execute("CREATE INDEX idx ON t(v1,v2)"); + conn.setAutoCommit(true); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + SQLParser parser = new SQLParser("DELETE FROM t"); + DeleteStatement delete = (DeleteStatement) parser.parseStatement(); + DeleteCompiler compiler = new DeleteCompiler(stmt, null); + MutationPlan plan = compiler.compile(delete); + assertEquals("T", plan.getQueryPlan().getTableRef().getTable().getTableName().getString()); + assertTrue(plan.getClass().getName().contains("ServerSelectDeleteMutationPlan")); + parser = new SQLParser("DELETE FROM t WHERE v1 = 'foo'"); + delete = (DeleteStatement) parser.parseStatement(); + plan = compiler.compile(delete); + assertEquals("IDX", plan.getQueryPlan().getTableRef().getTable().getTableName().getString()); + assertTrue(plan.getClass().getName().contains("ClientSelectDeleteMutationPlan")); + } + + @Test public void testChooseSmallerTable() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); conn.createStatement().execute("CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");