Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 b2b1762a4 -> 27bf5086a
PHOENIX-3056 Rows cannot be deleted from immutable table when in building state (James Taylor) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/27bf5086 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/27bf5086 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/27bf5086 Branch: refs/heads/4.x-HBase-1.1 Commit: 27bf5086a0dbfe38c2c4ee812d5daf302d9139f5 Parents: b2b1762 Author: Thomas D'Silva <[email protected]> Authored: Fri Jul 8 13:43:46 2016 -0700 Committer: Thomas D'Silva <[email protected]> Committed: Fri Jul 8 14:52:00 2016 -0700 ---------------------------------------------------------------------- .../end2end/index/AsyncImmutableIndexIT.java | 114 +++++++++++++++++++ .../apache/phoenix/compile/DeleteCompiler.java | 82 +++++++++---- 2 files changed, 174 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/27bf5086/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java new file mode 100644 index 0000000..8c90b6e --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.end2end.IndexToolIT; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class AsyncImmutableIndexIT extends BaseOwnClusterHBaseManagedTimeIT { + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + ReadOnlyProps.EMPTY_PROPS); + } + + @Test + public void testDeleteFromImmutable() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE TEST_TABLE (\n" + + " pk1 VARCHAR NOT NULL,\n" + + " pk2 VARCHAR NOT NULL,\n" + + " pk3 VARCHAR\n" + + " CONSTRAINT PK PRIMARY KEY \n" + + " (\n" + + " pk1,\n" + + " pk2,\n" + + " pk3\n" + + " )\n" + + " ) IMMUTABLE_ROWS=true"); + conn.createStatement().execute("upsert into TEST_TABLE (pk1, pk2, pk3) values ('a', '1', '1')"); + conn.createStatement().execute("upsert into TEST_TABLE (pk1, pk2, pk3) values ('b', '2', '2')"); + conn.commit(); + conn.createStatement().execute("CREATE INDEX TEST_INDEX ON TEST_TABLE (pk3, pk2) ASYNC"); + + // this delete will be issued at a timestamp later than the above timestamp of the index table + conn.createStatement().execute("delete from TEST_TABLE where pk1 = 'a'"); + conn.commit(); + + // run the index MR job + final IndexTool indexingTool = new IndexTool(); + indexingTool.setConf(new Configuration(getUtility().getConfiguration())); + final String[] cmdArgs = + IndexToolIT.getArgValues(null, "TEST_TABLE", "TEST_INDEX", true); + int status = indexingTool.run(cmdArgs); + assertEquals(0, status); + + // upsert two more rows + conn.createStatement().execute( + "upsert into TEST_TABLE (pk1, pk2, pk3) values ('a', '3', '3')"); + conn.createStatement().execute( + "upsert into TEST_TABLE (pk1, pk2, pk3) values ('b', '4', '4')"); + conn.commit(); + + // validate that delete markers were issued correctly and only ('a', '1', 'value1') was + // deleted + String query = "SELECT pk3 from TEST_TABLE ORDER BY pk3"; + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query); + String expectedPlan = + "CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER TEST_INDEX\n" + + " SERVER FILTER BY FIRST KEY ONLY"; + assertEquals("Wrong plan ", expectedPlan, QueryUtil.getExplainPlan(rs)); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("2", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("3", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("4", rs.getString(1)); + assertFalse(rs.next()); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/27bf5086/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 504f994..42efd68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ import java.sql.ParameterMetaData; import java.sql.SQLException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -80,13 +81,13 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.sun.istack.NotNull; public class DeleteCompiler { @@ -100,7 +101,7 @@ public class DeleteCompiler { this.operation = operation; } - private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { + private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, List<TableRef> indexTableRefs, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { PTable table = targetTableRef.getTable(); PhoenixStatement statement = childContext.getStatement(); PhoenixConnection connection = statement.getConnection(); @@ -114,11 +115,14 @@ public class DeleteCompiler { final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); - Map<ImmutableBytesPtr,RowMutationState> indexMutations = null; + List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null; // If indexTableRef is set, we're deleting the rows from both the index table and // the data table through a single query to save executing an additional one. - if (indexTableRef != null) { - indexMutations = Maps.newHashMapWithExpectedSize(batchSize); + if (!indexTableRefs.isEmpty()) { + indexMutations = Lists.newArrayListWithExpectedSize(indexTableRefs.size()); + for (int i = 0; i < indexTableRefs.size(); i++) { + indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize)); + } } List<PColumn> pkColumns = table.getPKColumns(); boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; @@ -156,10 +160,10 @@ public class DeleteCompiler { // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the // row key will already have its value. mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); - if (indexTableRef != null) { + for (int i = 0; i < indexTableRefs.size(); i++) { ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map rs.getCurrentRow().getKey(indexPtr); - indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); + indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -169,8 +173,8 @@ public class DeleteCompiler { if (isAutoCommit && rowCount % batchSize == 0) { MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection); connection.getMutationState().join(state); - if (indexTableRef != null) { - MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + for (int i = 0; i < indexTableRefs.size(); i++) { + MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, connection); connection.getMutationState().join(indexState); } connection.getMutationState().send(); @@ -184,9 +188,9 @@ public class DeleteCompiler { // If auto commit is true, this last batch will be committed upon return int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0; MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection); - if (indexTableRef != null) { + for (int i = 0; i < indexTableRefs.size(); i++) { // To prevent the counting of these index rows, we have a negative for remainingRows. - MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, connection); state.join(indexState); } return state; @@ -196,7 +200,7 @@ public class DeleteCompiler { private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory { private RowProjector projector; private TableRef targetTableRef; - private TableRef indexTableRef; + private List<TableRef> indexTableRefs; private TableRef sourceTableRef; private DeletingParallelIteratorFactory(PhoenixConnection connection) { @@ -212,7 +216,7 @@ public class DeleteCompiler { * iterator being used for reading rows out. */ StatementContext ctx = new StatementContext(statement, false); - MutationState state = deleteRows(ctx, targetTableRef, indexTableRef, iterator, projector, sourceTableRef); + MutationState state = deleteRows(ctx, targetTableRef, indexTableRefs, iterator, projector, sourceTableRef); return state; } @@ -228,8 +232,8 @@ public class DeleteCompiler { this.projector = projector; } - public void setIndexTargetTableRef(TableRef indexTableRef) { - this.indexTableRef = indexTableRef; + public void setIndexTargetTableRefs(List<TableRef> indexTableRefs) { + this.indexTableRefs = indexTableRefs; } } @@ -298,6 +302,17 @@ public class DeleteCompiler { } } + private static boolean hasNonPKIndexedColumns(Collection<PTable> immutableIndexes) { + for (PTable index : immutableIndexes) { + for (PColumn column : index.getPKColumns()) { + if (!IndexUtil.isDataPKColumn(column)) { + return true; + } + } + } + return false; + } + public MutationPlan compile(DeleteStatement delete) throws SQLException { final PhoenixConnection connection = statement.getConnection(); final boolean isAutoCommit = connection.getAutoCommit(); @@ -393,8 +408,17 @@ public class DeleteCompiler { } break; } - final QueryPlan dataPlan = dataPlanToBe; + boolean isBuildingImmutable = false; final boolean hasImmutableIndexes = !immutableIndex.isEmpty(); + if (hasImmutableIndexes) { + for (PTable index : immutableIndex.values()){ + if (index.getIndexState() == PIndexState.BUILDING) { + isBuildingImmutable = true; + break; + } + } + } + final QueryPlan dataPlan = dataPlanToBe; // tableRefs is parallel with queryPlans TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1]; if (hasImmutableIndexes) { @@ -406,7 +430,7 @@ public class DeleteCompiler { if (table.getType() == PTableType.INDEX) { // index plans tableRefs[i++] = plan.getTableRef(); immutableIndex.remove(table.getKey()); - } else { // data plan + } else if (!isBuildingImmutable) { // data plan /* * If we have immutable indexes that we need to maintain, don't execute the data plan * as we can save a query by piggy-backing on any of the other index queries, since the @@ -421,10 +445,18 @@ public class DeleteCompiler { * immutable index. */ if (!immutableIndex.isEmpty()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString()) - .setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException(); + Collection<PTable> immutableIndexes = immutableIndex.values(); + if (!isBuildingImmutable || hasNonPKIndexedColumns(immutableIndexes)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString()) + .setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException(); + } + runOnServer = false; } } + List<TableRef> buildingImmutableIndexes = Lists.newArrayListWithExpectedSize(immutableIndex.values().size()); + for (PTable index : immutableIndex.values()) { + buildingImmutableIndexes.add(new TableRef(index, dataPlan.getTableRef().getTimeStamp(), dataPlan.getTableRef().getLowerBoundTimeStamp())); + } // Make sure the first plan is targeting deletion from the data table // In the case of an immutable index, we'll also delete from the index. @@ -586,7 +618,13 @@ public class DeleteCompiler { } }); } else { - final boolean deleteFromImmutableIndexToo = hasImmutableIndexes && !plan.getTableRef().equals(tableRef); + List<TableRef> immutableIndexRefsToBe = Lists.newArrayListWithExpectedSize(dataPlan.getTableRef().getTable().getIndexes().size()); + if (!buildingImmutableIndexes.isEmpty()) { + immutableIndexRefsToBe = buildingImmutableIndexes; + } else if (hasImmutableIndexes && !plan.getTableRef().equals(tableRef)) { + immutableIndexRefsToBe = Collections.singletonList(plan.getTableRef()); + } + final List<TableRef> immutableIndexRefs = immutableIndexRefsToBe; final DeletingParallelIteratorFactory parallelIteratorFactory2 = parallelIteratorFactory; mutationPlans.add( new MutationPlan() { @Override @@ -625,7 +663,7 @@ public class DeleteCompiler { parallelIteratorFactory2.setRowProjector(plan.getProjector()); parallelIteratorFactory2.setTargetTableRef(tableRef); parallelIteratorFactory2.setSourceTableRef(plan.getTableRef()); - parallelIteratorFactory2.setIndexTargetTableRef(deleteFromImmutableIndexToo ? plan.getTableRef() : null); + parallelIteratorFactory2.setIndexTargetTableRefs(immutableIndexRefs); } while ((tuple=iterator.next()) != null) {// Runs query Cell kv = tuple.getValue(0); @@ -640,7 +678,7 @@ public class DeleteCompiler { return state; } else { - return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef()); + return deleteRows(plan.getContext(), tableRef, immutableIndexRefs, iterator, plan.getProjector(), plan.getTableRef()); } } finally { iterator.close();
