PHOENIX-619 Support DELETE over table with immutable index when possible Conflicts: phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/88121cb0 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/88121cb0 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/88121cb0 Branch: refs/heads/3.0 Commit: 88121cb05b5aa911e7cc27ff5306423299d5d1b6 Parents: 9777b6c Author: James Taylor <jtay...@salesforce.com> Authored: Mon Oct 13 20:23:43 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Mon Oct 13 23:41:27 2014 -0700 ---------------------------------------------------------------------- .../end2end/BaseTenantSpecificTablesIT.java | 4 +- .../end2end/TenantSpecificTablesDMLIT.java | 43 ++ .../phoenix/end2end/index/ImmutableIndexIT.java | 2 +- .../phoenix/end2end/index/ViewIndexIT.java | 87 +++ .../apache/phoenix/compile/DeleteCompiler.java | 541 ++++++++++++------- .../MutatingParallelIteratorFactory.java | 5 +- .../phoenix/compile/PostIndexDDLCompiler.java | 37 +- .../apache/phoenix/compile/UpsertCompiler.java | 4 +- .../phoenix/exception/SQLExceptionCode.java | 2 +- .../apache/phoenix/execute/MutationState.java | 44 +- .../apache/phoenix/jdbc/PhoenixResultSet.java | 4 + .../apache/phoenix/optimize/QueryOptimizer.java | 55 +- .../query/ConnectionQueryServicesImpl.java | 1 + .../java/org/apache/phoenix/util/IndexUtil.java | 14 +- .../phoenix/compile/QueryCompilerTest.java | 14 +- .../TenantSpecificViewIndexCompileTest.java | 2 +- 16 files changed, 600 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java index 362fa08..6d6bffc 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java @@ -44,7 +44,7 @@ public abstract class BaseTenantSpecificTablesIT extends BaseOwnClusterClientMan " tenant_id VARCHAR(5) NOT NULL,\n" + " tenant_type_id VARCHAR(3) NOT NULL, \n" + " id INTEGER NOT NULL\n" + - " CONSTRAINT pk PRIMARY KEY (tenant_id, tenant_type_id, id)) MULTI_TENANT=true"; + " CONSTRAINT pk PRIMARY KEY (tenant_id, tenant_type_id, id)) MULTI_TENANT=true, IMMUTABLE_ROWS=true"; protected static final String TENANT_TABLE_NAME = "TENANT_TABLE"; protected static final String TENANT_TABLE_DDL = "CREATE VIEW " + TENANT_TABLE_NAME + " ( \n" + @@ -56,7 +56,7 @@ public abstract class BaseTenantSpecificTablesIT extends BaseOwnClusterClientMan " user VARCHAR ,\n" + " tenant_id VARCHAR(5) NOT NULL,\n" + " id INTEGER NOT NULL,\n" + - " CONSTRAINT pk PRIMARY KEY (tenant_id, id)) MULTI_TENANT=true"; + " CONSTRAINT pk PRIMARY KEY (tenant_id, id)) MULTI_TENANT=true, IMMUTABLE_ROWS=true"; protected static final String TENANT_TABLE_NAME_NO_TENANT_TYPE_ID = "TENANT_TABLE_NO_TENANT_TYPE_ID"; protected static final String TENANT_TABLE_DDL_NO_TENANT_TYPE_ID = "CREATE VIEW " + TENANT_TABLE_NAME_NO_TENANT_TYPE_ID + " ( \n" + http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java index 76a8c7c..7fd3a82 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java @@ -265,6 +265,49 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT { } @Test + public void testDeleteWhenImmutableIndex() throws Exception { + Connection conn = nextConnection(getUrl()); + try { + conn.setAutoCommit(true); + conn.createStatement().executeUpdate("delete from " + PARENT_TABLE_NAME); + conn.close(); + + conn = nextConnection(getUrl()); + conn.setAutoCommit(true); + conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')"); + conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')"); + conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')"); + conn.close(); + + conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + conn.createStatement().executeUpdate("create index idx1 on " + TENANT_TABLE_NAME + "(user)"); + conn.close(); + + conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + conn.setAutoCommit(true); + int count = conn.createStatement().executeUpdate("delete from " + TENANT_TABLE_NAME + " where user='Billy Gibbons'"); + assertEquals("Expected 1 row have been deleted", 1, count); + conn.close(); + + conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL); + conn.setAutoCommit(true); + ResultSet rs = conn.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME); + assertFalse("Expected no rows in result set", rs.next()); + conn.close(); + + conn = nextConnection(getUrl()); + analyzeTable(conn, PARENT_TABLE_NAME); + conn = nextConnection(getUrl()); + rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME); + rs.next(); + assertEquals(2, rs.getInt(1)); + } + finally { + conn.close(); + } + } + + @Test public void testDeleteOnlyDeletesTenantDataWithNoTenantTypeId() throws Exception { Connection conn = nextConnection(getUrl()); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 1d7ac92..eb3a668 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -272,7 +272,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { conn.createStatement().execute(dml); fail(); } catch (SQLException e) { - assertEquals(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX.getErrorCode(), e.getErrorCode()); + assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode()); } conn.createStatement().execute("DROP TABLE " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java new file mode 100644 index 0000000..19053d9 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.junit.Assert.assertFalse; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; + +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.phoenix.end2end.HBaseManagedTimeTest; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Maps; + +@Category(HBaseManagedTimeTest.class) +public class ViewIndexIT extends BaseIndexIT { + + private String VIEW_NAME = "MY_VIEW"; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Drop the HBase table metadata for this test to confirm that view index table dropped + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + private void createBaseTable(String tableName, Integer saltBuckets, String splits) throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + + "v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n" + + (saltBuckets == null || splits != null ? "" : (",salt_buckets=" + saltBuckets) + + (saltBuckets != null || splits == null ? "" : ",splits=" + splits)); + conn.createStatement().execute(ddl); + conn.close(); + } + + @Test + public void testDeleteViewIndexSequences() throws Exception { + createBaseTable(DATA_TABLE_NAME, null, null); + Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl()); + conn1.createStatement().execute("CREATE VIEW " + VIEW_NAME + " AS SELECT * FROM " + DATA_TABLE_NAME); + conn1.createStatement().execute("CREATE INDEX " + INDEX_TABLE_NAME + " ON " + VIEW_NAME + " (v1)"); + conn2.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next(); + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + conn1.createStatement().execute("DROP VIEW " + VIEW_NAME); + conn1.createStatement().execute("DROP TABLE "+ DATA_TABLE_NAME); + admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + assertFalse("View index table should be deleted.", admin.tableExists(MetaDataUtil.getViewIndexTableName(DATA_TABLE_NAME))); + ResultSet rs = conn2.createStatement().executeQuery("SELECT " + + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + "," + + PhoenixDatabaseMetaData.SEQUENCE_NAME + + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME); + assertFalse("View index sequences should be deleted.", rs.next()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 2fd5535..8af0e15 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -18,13 +18,13 @@ package org.apache.phoenix.compile; import java.sql.ParameterMetaData; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; @@ -64,20 +64,24 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.MetaDataEntityNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.sun.istack.NotNull; public class DeleteCompiler { private static ParseNodeFactory FACTORY = new ParseNodeFactory(); @@ -88,8 +92,8 @@ public class DeleteCompiler { this.statement = statement; } - private static MutationState deleteRows(PhoenixStatement statement, TableRef tableRef, ResultIterator iterator, RowProjector projector) throws SQLException { - PTable table = tableRef.getTable(); + private static MutationState deleteRows(PhoenixStatement statement, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { + PTable table = targetTableRef.getTable(); PhoenixConnection connection = statement.getConnection(); PName tenantId = connection.getTenantId(); byte[] tenantIdBytes = null; @@ -102,6 +106,12 @@ public class DeleteCompiler { final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null; + // If indexTableRef is set, we're deleting the rows from both the index table and + // the data table through a single query to save executing an additional one. + if (indexTableRef != null) { + indexMutations = Maps.newHashMapWithExpectedSize(batchSize); + } try { List<PColumn> pkColumns = table.getPKColumns(); boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; @@ -114,37 +124,61 @@ public class DeleteCompiler { if (isSharedViewIndex) { values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); } - ResultSet rs = new PhoenixResultSet(iterator, projector, statement); + PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, statement); int rowCount = 0; while (rs.next()) { - for (int i = offset; i < values.length; i++) { - byte[] byteValue = rs.getBytes(i+1-offset); - // The ResultSet.getBytes() call will have inverted it - we need to invert it back. - // TODO: consider going under the hood and just getting the bytes - if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) { - byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length); - byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + // Use tuple directly, as projector would not have all the PK columns from + // our index table inside of our projection. Since the tables are equal, + // there's no transation required. + if (sourceTableRef.equals(targetTableRef)) { + rs.getCurrentRow().getKey(ptr); + } else { + for (int i = offset; i < values.length; i++) { + byte[] byteValue = rs.getBytes(i+1-offset); + // The ResultSet.getBytes() call will have inverted it - we need to invert it back. + // TODO: consider going under the hood and just getting the bytes + if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) { + byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length); + byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length); + } + values[i] = byteValue; } - values[i] = byteValue; + table.newKey(ptr, values); } - ImmutableBytesPtr ptr = new ImmutableBytesPtr(); - table.newKey(ptr, values); mutations.put(ptr, PRow.DELETE_MARKER); + if (indexTableRef != null) { + ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + rs.getCurrentRow().getKey(indexPtr); + indexMutations.put(indexPtr, PRow.DELETE_MARKER); + } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); } rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { - MutationState state = new MutationState(tableRef, mutations, 0, maxSize, connection); + MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection); connection.getMutationState().join(state); + if (indexTableRef != null) { + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + connection.getMutationState().join(indexState); + } connection.commit(); mutations.clear(); + indexMutations.clear(); } } // If auto commit is true, this last batch will be committed upon return - return new MutationState(tableRef, mutations, rowCount / batchSize * batchSize, maxSize, connection); + int nCommittedRows = rowCount / batchSize * batchSize; + MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection); + if (indexTableRef != null) { + // To prevent the counting of these index rows, we have a negative for remainingRows. + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + state.join(indexState); + } + return state; } finally { iterator.close(); } @@ -152,39 +186,90 @@ public class DeleteCompiler { private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory { private RowProjector projector; + private TableRef targetTableRef; + private TableRef indexTableRef; + private TableRef sourceTableRef; - private DeletingParallelIteratorFactory(PhoenixConnection connection, TableRef tableRef) { - super(connection, tableRef); + private DeletingParallelIteratorFactory(PhoenixConnection connection) { + super(connection); } @Override protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException { PhoenixStatement statement = new PhoenixStatement(connection); - return deleteRows(statement, tableRef, iterator, projector); + return deleteRows(statement, targetTableRef, indexTableRef, iterator, projector, sourceTableRef); + } + + public void setTargetTableRef(TableRef tableRef) { + this.targetTableRef = tableRef; + } + + public void setSourceTableRef(TableRef tableRef) { + this.sourceTableRef = tableRef; } public void setRowProjector(RowProjector projector) { this.projector = projector; } + + public void setIndexTargetTableRef(TableRef indexTableRef) { + this.indexTableRef = indexTableRef; + } } - private boolean hasImmutableIndex(TableRef tableRef) { - return tableRef.getTable().isImmutableRows() && !tableRef.getTable().getIndexes().isEmpty(); + private Set<PTable> getNonDisabledImmutableIndexes(TableRef tableRef) { + PTable table = tableRef.getTable(); + if (table.isImmutableRows() && !table.getIndexes().isEmpty()) { + Set<PTable> nonDisabledIndexes = Sets.newHashSetWithExpectedSize(table.getIndexes().size()); + for (PTable index : table.getIndexes()) { + if (index.getIndexState() != PIndexState.DISABLE) { + nonDisabledIndexes.add(index); + } + } + return nonDisabledIndexes; + } + return Collections.emptySet(); } - private boolean hasImmutableIndexWithKeyValueColumns(TableRef tableRef) { - if (!hasImmutableIndex(tableRef)) { - return false; + private class MultiDeleteMutationPlan implements MutationPlan { + private final List<MutationPlan> plans; + private final MutationPlan firstPlan; + + public MultiDeleteMutationPlan(@NotNull List<MutationPlan> plans) { + Preconditions.checkArgument(!plans.isEmpty()); + this.plans = plans; + this.firstPlan = plans.get(0); } - for (PTable index : tableRef.getTable().getIndexes()) { - for (PColumn column : index.getPKColumns()) { - if (!IndexUtil.isDataPKColumn(column)) { - return true; - } + + @Override + public StatementContext getContext() { + return firstPlan.getContext(); + } + + @Override + public ParameterMetaData getParameterMetaData() { + return firstPlan.getParameterMetaData(); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return firstPlan.getExplainPlan(); + } + + @Override + public PhoenixConnection getConnection() { + return firstPlan.getConnection(); + } + + @Override + public MutationState execute() throws SQLException { + MutationState state = firstPlan.execute(); + for (MutationPlan plan : plans.subList(1, plans.size())) { + plan.execute(); } + return state; } - return false; } public MutationPlan compile(DeleteStatement delete) throws SQLException { @@ -192,7 +277,7 @@ public class DeleteCompiler { final boolean isAutoCommit = connection.getAutoCommit(); final boolean hasLimit = delete.getLimit() != null; final ConnectionQueryServices services = connection.getQueryServices(); - QueryPlan planToBe = null; + List<QueryPlan> queryPlans; NamedTableNode tableNode = delete.getTable(); String tableName = tableNode.getName().getTableName(); String schemaName = tableNode.getName().getSchemaName(); @@ -201,6 +286,7 @@ public class DeleteCompiler { boolean noQueryReqd = false; boolean runOnServer = false; SelectStatement select = null; + Set<PTable> immutableIndex = Collections.emptySet(); DeletingParallelIteratorFactory parallelIteratorFactory = null; while (true) { try { @@ -211,7 +297,9 @@ public class DeleteCompiler { throw new ReadOnlyTableException(table.getSchemaName().getString(),table.getTableName().getString()); } - noQueryReqd = !hasLimit && !hasImmutableIndex(tableRefToBe); + immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe); + boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty(); + noQueryReqd = !hasLimit; runOnServer = isAutoCommit && noQueryReqd; HintNode hint = delete.getHint(); if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) { @@ -233,8 +321,20 @@ public class DeleteCompiler { delete.getOrderBy(), delete.getLimit(), delete.getBindCount(), false, false); select = StatementNormalizer.normalize(select, resolver); - parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection, tableRefToBe); - planToBe = new QueryOptimizer(services).optimize(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory); + parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection); + QueryOptimizer optimizer = new QueryOptimizer(services); + queryPlans = Lists.newArrayList(mayHaveImmutableIndexes + ? optimizer.getApplicablePlans(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory) + : optimizer.getBestPlan(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory)); + if (mayHaveImmutableIndexes) { // FIXME: this is ugly + // Lookup the table being deleted from in the cache, as it's possible that the + // optimizer updated the cache if it found indexes that were out of date. + // If the index was marked as disabled, it should not be in the list + // of immutable indexes. + table = connection.getMetaDataCache().getTable(new PTableKey(table.getTenantId(), table.getName().getString())); + tableRefToBe.setTable(table); + immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe); + } } catch (MetaDataEntityNotFoundException e) { // Catch column/column family not found exception, as our meta data may // be out of sync. Update the cache once and retry if we were out of sync. @@ -250,182 +350,223 @@ public class DeleteCompiler { } break; } - final TableRef tableRef = tableRefToBe; - final QueryPlan plan = planToBe; - if (!plan.getTableRef().equals(tableRef)) { - runOnServer = false; - noQueryReqd = false; - } - - final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - - if (hasImmutableIndexWithKeyValueColumns(tableRef)) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX).setSchemaName(tableRef.getTable().getSchemaName().getString()) - .setTableName(tableRef.getTable().getTableName().getString()).build().buildException(); + final boolean hasImmutableIndexes = !immutableIndex.isEmpty(); + // tableRefs is parallel with queryPlans + TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1]; + if (hasImmutableIndexes) { + int i = 0; + Iterator<QueryPlan> plans = queryPlans.iterator(); + while (plans.hasNext()) { + QueryPlan plan = plans.next(); + PTable table = plan.getTableRef().getTable(); + if (table.getType() == PTableType.INDEX) { // index plans + tableRefs[i++] = plan.getTableRef(); + immutableIndex.remove(table); + } else { // data plan + /* + * If we have immutable indexes that we need to maintain, don't execute the data plan + * as we can save a query by piggy-backing on any of the other index queries, since the + * PK columns that we need are always in each index row. + */ + plans.remove(); + } + } + /* + * If we have any immutable indexes remaining, then that means that the plan for that index got filtered out + * because it could not be executed. This would occur if a column in the where clause is not found in the + * immutable index. + */ + if (!immutableIndex.isEmpty()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString()) + .setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException(); + } } - final StatementContext context = plan.getContext(); - // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all. - // A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause - // may have been optimized out. Instead, we check that there's a single SkipScanFilter - if (noQueryReqd - && (!context.getScan().hasFilter() - || context.getScan().getFilter() instanceof SkipScanFilter) - && context.getScanRanges().isPointLookup()) { - return new MutationPlan() { - - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } - - @Override - public MutationState execute() { - // We have a point lookup, so we know we have a simple set of fully qualified - // keys for our ranges - ScanRanges ranges = context.getScanRanges(); - Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); - while (iterator.hasNext()) { - mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER); + // Make sure the first plan is targeting deletion from the data table + // In the case of an immutable index, we'll also delete from the index. + tableRefs[0] = tableRefToBe; + /* + * Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows + * from the data table, while the others will be for deleting rows from immutable indexes. + */ + List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(tableRefs.length); + for (int i = 0; i < tableRefs.length; i++) { + final TableRef tableRef = tableRefs[i]; + final QueryPlan plan = queryPlans.get(i); + if (!plan.getTableRef().equals(tableRef)) { + runOnServer = false; + noQueryReqd = false; // FIXME: why set this to false in this case? + } + + final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + + final StatementContext context = plan.getContext(); + // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all. + // A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause + // may have been optimized out. Instead, we check that there's a single SkipScanFilter + if (noQueryReqd + && (!context.getScan().hasFilter() + || context.getScan().getFilter() instanceof SkipScanFilter) + && context.getScanRanges().isPointLookup()) { + mutationPlans.add(new MutationPlan() { + + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); } - return new MutationState(tableRef, mutation, 0, maxSize, connection); - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW")); - } - - @Override - public PhoenixConnection getConnection() { - return connection; - } - - @Override - public StatementContext getContext() { - return context; - } - }; - } else if (runOnServer) { - // TODO: better abstraction - Scan scan = context.getScan(); - scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); - - // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where> - // The coprocessor will delete each row returned from the scan - // Ignoring ORDER BY, since with auto commit on and no limit makes no difference - SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint()); - final RowProjector projector = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY); - final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); - return new MutationPlan() { - - @Override - public PhoenixConnection getConnection() { - return connection; - } - - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } - - @Override - public StatementContext getContext() { - return context; - } - - @Override - public MutationState execute() throws SQLException { - // TODO: share this block of code with UPSERT SELECT - ImmutableBytesWritable ptr = context.getTempPtr(); - tableRef.getTable().getIndexMaintainers(ptr); - ServerCache cache = null; - try { - if (ptr.getLength() > 0) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(context.getScanRanges(), ptr); - byte[] uuidValue = cache.getId(); - context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + + @Override + public MutationState execute() { + // We have a point lookup, so we know we have a simple set of fully qualified + // keys for our ranges + ScanRanges ranges = context.getScanRanges(); + Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); + while (iterator.hasNext()) { + mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER); } - ResultIterator iterator = aggPlan.iterator(); + return new MutationState(tableRef, mutation, 0, maxSize, connection); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW")); + } + + @Override + public PhoenixConnection getConnection() { + return connection; + } + + @Override + public StatementContext getContext() { + return context; + } + }); + } else if (runOnServer) { + // TODO: better abstraction + Scan scan = context.getScan(); + scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); + + // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where> + // The coprocessor will delete each row returned from the scan + // Ignoring ORDER BY, since with auto commit on and no limit makes no difference + SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint()); + final RowProjector projector = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY); + final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + mutationPlans.add(new MutationPlan() { + + @Override + public PhoenixConnection getConnection() { + return connection; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public MutationState execute() throws SQLException { + // TODO: share this block of code with UPSERT SELECT + ImmutableBytesWritable ptr = context.getTempPtr(); + tableRef.getTable().getIndexMaintainers(ptr); + ServerCache cache = null; try { - Tuple row = iterator.next(); - final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr); - return new MutationState(maxSize, connection) { - @Override - public long getUpdateCount() { - return mutationCount; - } - }; + if (ptr.getLength() > 0) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(context.getScanRanges(), ptr); + byte[] uuidValue = cache.getId(); + context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + } + ResultIterator iterator = aggPlan.iterator(); + try { + Tuple row = iterator.next(); + final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr); + return new MutationState(maxSize, connection) { + @Override + public long getUpdateCount() { + return mutationCount; + } + }; + } finally { + iterator.close(); + } } finally { - iterator.close(); - } - } finally { - if (cache != null) { - cache.close(); + if (cache != null) { + cache.close(); + } } } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("DELETE ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }); + } else { + final boolean deleteFromImmutableIndexToo = hasImmutableIndexes && !plan.getTableRef().equals(tableRef); + if (parallelIteratorFactory != null) { + parallelIteratorFactory.setRowProjector(plan.getProjector()); + parallelIteratorFactory.setTargetTableRef(tableRef); + parallelIteratorFactory.setSourceTableRef(plan.getTableRef()); + parallelIteratorFactory.setIndexTargetTableRef(deleteFromImmutableIndexToo ? plan.getTableRef() : null); } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("DELETE ROWS"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - }; - } else { - if (parallelIteratorFactory != null) { - parallelIteratorFactory.setRowProjector(plan.getProjector()); - } - return new MutationPlan() { - - @Override - public PhoenixConnection getConnection() { - return connection; - } - - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } - - @Override - public StatementContext getContext() { - return context; - } - - @Override - public MutationState execute() throws SQLException { - ResultIterator iterator = plan.iterator(); - if (!hasLimit) { - Tuple tuple; - long totalRowCount = 0; - while ((tuple=iterator.next()) != null) {// Runs query - KeyValue kv = tuple.getValue(0); - totalRowCount += PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), SortOrder.getDefault()); + mutationPlans.add( new MutationPlan() { + + @Override + public PhoenixConnection getConnection() { + return connection; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = plan.iterator(); + if (!hasLimit) { + Tuple tuple; + long totalRowCount = 0; + while ((tuple=iterator.next()) != null) {// Runs query + KeyValue kv = tuple.getValue(0); + totalRowCount += PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), SortOrder.getDefault()); + } + // Return total number of rows that have been delete. In the case of auto commit being off + // the mutations will all be in the mutation state of the current connection. + return new MutationState(maxSize, connection, totalRowCount); + } else { + return deleteRows(statement, tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef()); } - // Return total number of rows that have been delete. In the case of auto commit being off - // the mutations will all be in the mutation state of the current connection. - return new MutationState(maxSize, connection, totalRowCount); - } else { - return deleteRows(statement, tableRef, iterator, plan.getProjector()); } - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> queryPlanSteps = plan.getExplainPlan().getPlanSteps(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("DELETE ROWS"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - }; + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = plan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("DELETE ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }); + } } - + return mutationPlans.size() == 1 ? mutationPlans.get(0) : new MultiDeleteMutationPlan(mutationPlans); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index df91b1d..6388b1a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -36,7 +36,6 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PDataType; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.KeyValueUtil; @@ -46,11 +45,9 @@ import org.apache.phoenix.util.KeyValueUtil; */ public abstract class MutatingParallelIteratorFactory implements ParallelIteratorFactory { protected final PhoenixConnection connection; - protected final TableRef tableRef; - protected MutatingParallelIteratorFactory(PhoenixConnection connection, TableRef tableRef) { + protected MutatingParallelIteratorFactory(PhoenixConnection connection) { this.connection = connection; - this.tableRef = tableRef; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java index 5998e16..2ea42ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java @@ -24,6 +24,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.IndexUtil; @@ -55,25 +56,33 @@ public class PostIndexDDLCompiler { // that would allow the user to easily monitor the process of index creation. StringBuilder indexColumns = new StringBuilder(); StringBuilder dataColumns = new StringBuilder(); - List<PColumn> dataTableColumns = dataTableRef.getTable().getColumns(); + List<PColumn> dataPKColumns = dataTableRef.getTable().getPKColumns(); PTable dataTable = dataTableRef.getTable(); - int nColumns = dataTable.getColumns().size(); + int nPKColumns = dataPKColumns.size(); boolean isSalted = dataTable.getBucketNum() != null; boolean isMultiTenant = connection.getTenantId() != null && dataTable.isMultiTenant(); - boolean isSharedViewIndex = dataTable.getViewIndexId() != null; - int posOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); - for (int i = posOffset; i < nColumns; i++) { - PColumn col = dataTableColumns.get(i); - String indexColName = IndexUtil.getIndexColumnName(col); - try { - indexTable.getColumn(indexColName); - if (col.getFamilyName() != null) { - dataColumns.append('"').append(col.getFamilyName()).append("\"."); - } + int posOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0); + for (int i = posOffset; i < nPKColumns; i++) { + PColumn col = dataPKColumns.get(i); + if (col.getViewConstant() == null) { + String indexColName = IndexUtil.getIndexColumnName(col); dataColumns.append('"').append(col.getName()).append("\","); indexColumns.append('"').append(indexColName).append("\","); - } catch (ColumnNotFoundException e) { - // Catch and ignore - means that this data column is not in the index + } + } + for (PColumnFamily family : dataTableRef.getTable().getColumnFamilies()) { + for (PColumn col : family.getColumns()) { + if (col.getViewConstant() == null) { + String indexColName = IndexUtil.getIndexColumnName(col); + try { + indexTable.getColumn(indexColName); + dataColumns.append('"').append(col.getFamilyName()).append("\"."); + dataColumns.append('"').append(col.getName()).append("\","); + indexColumns.append('"').append(indexColName).append("\","); + } catch (ColumnNotFoundException e) { + // Catch and ignore - means that this data column is not in the index + } + } } } dataColumns.setLength(dataColumns.length()-1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 046e375..e789567 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -177,9 +177,11 @@ public class UpsertCompiler { private RowProjector projector; private int[] columnIndexes; private int[] pkSlotIndexes; + private final TableRef tableRef; private UpsertingParallelIteratorFactory (PhoenixConnection connection, TableRef tableRef) { - super(connection, tableRef); + super(connection); + this.tableRef = tableRef; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index a300611..a8775f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -206,7 +206,7 @@ public enum SQLExceptionCode { SET_UNSUPPORTED_PROP_ON_ALTER_TABLE(1025, "42Y84", "Unsupported property set in ALTER TABLE command."), CANNOT_ADD_NOT_NULLABLE_COLUMN(1038, "42Y84", "Only nullable columns may be added for a pre-existing table."), NO_MUTABLE_INDEXES(1026, "42Y85", "Mutable secondary indexes are only supported for HBase version " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) + " and above."), - NO_DELETE_IF_IMMUTABLE_INDEX(1027, "42Y86", "Delete not allowed on a table with IMMUTABLE_ROW with non PK column in index."), + INVALID_FILTER_ON_IMMUTABLE_ROWS(1027, "42Y86", "All columns referenced in a WHERE clause must be available in every index for a table with immutable rows."), INVALID_INDEX_STATE_TRANSITION(1028, "42Y87", "Invalid index state transition."), INVALID_MUTABLE_INDEX_CONFIG(1029, "42Y88", "Mutable secondary indexes must have the " + IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY + " property set to " http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 9864218..42ba931 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -19,6 +19,7 @@ package org.apache.phoenix.execute; import java.io.IOException; import java.sql.SQLException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; @@ -72,7 +74,7 @@ public class MutationState implements SQLCloseable { private final long maxSize; private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? - private final long sizeOffset; + private long sizeOffset; private int numRows = 0; public MutationState(int maxSize, PhoenixConnection connection) { @@ -125,10 +127,14 @@ public class MutationState implements SQLCloseable { if (this == newMutation) { // Doesn't make sense return; } + this.sizeOffset += newMutation.sizeOffset; // Merge newMutation with this one, keeping state from newMutation for any overlaps for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) { // Replace existing entries for the table with new entries - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(entry.getKey(), entry.getValue()); + TableRef tableRef = entry.getKey(); + PTable table = tableRef.getTable(); + boolean isIndex = table.getType() == PTableType.INDEX; + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(tableRef, entry.getValue()); if (existingRows != null) { // Rows for that table already exist // Loop through new rows and replace existing with new for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) { @@ -149,38 +155,52 @@ public class MutationState implements SQLCloseable { } } } else { - numRows++; + if (!isIndex) { // Don't count index rows in row count + numRows++; + } } } // Put the existing one back now that it's merged this.mutations.put(entry.getKey(), existingRows); } else { - numRows += entry.getValue().size(); + if (!isIndex) { + numRows += entry.getValue().size(); + } } } throwIfTooBig(); } private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) { + final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism + (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? + IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : + Iterators.<PTable>emptyIterator(); final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size()); + final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next(); ImmutableBytesPtr key = rowEntry.getKey(); PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key); + List<Mutation> rowMutations, rowMutationsPertainingToIndex; if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete row.delete(); + rowMutations = row.toRowMutations(); + // Row deletes for index tables are processed by running a re-written query + // against the index table (as this allows for flexibility in being able to + // delete rows). + rowMutationsPertainingToIndex = Collections.emptyList(); } else { for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) { row.setValue(valueEntry.getKey(), valueEntry.getValue()); } + rowMutations = row.toRowMutations(); + rowMutationsPertainingToIndex = rowMutations; } - mutations.addAll(row.toRowMutations()); + mutations.addAll(rowMutations); + if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); } - final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism - (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? - IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : - Iterators.<PTable>emptyIterator(); return new Iterator<Pair<byte[],List<Mutation>>>() { boolean isFirst = true; @@ -199,7 +219,7 @@ public class MutationState implements SQLCloseable { List<Mutation> indexMutations; try { indexMutations = - IndexUtil.generateIndexData(tableRef.getTable(), index, mutations, + IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex, tempPtr, connection.getKeyValueBuilder()); } catch (SQLException e) { throw new IllegalDataException(e); @@ -436,7 +456,9 @@ public class MutationState implements SQLCloseable { } while (shouldRetry && retryCount++ < 1); isDataTable = false; } - numRows -= entry.getValue().size(); + if (tableRef.getTable().getType() != PTableType.INDEX) { + numRows -= entry.getValue().size(); + } iterator.remove(); // Remove batches as we process them } assert(numRows==0); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 3f280ca..e662a3f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -723,6 +723,10 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho throw new SQLFeatureNotSupportedException(); } + public Tuple getCurrentRow() { + return currentRow; + } + @Override public boolean next() throws SQLException { checkOpen(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index f0c43b7..57fdde6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -77,18 +77,37 @@ public class QueryOptimizer { } public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { + List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true); + return plans.get(0); + } + + public List<QueryPlan> getBestPlan(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { + return getApplicablePlans(statement, select, resolver, targetColumns, parallelIteratorFactory, true); + } + + public List<QueryPlan> getApplicablePlans(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { + return getApplicablePlans(statement, select, resolver, targetColumns, parallelIteratorFactory, false); + } + + private List<QueryPlan> getApplicablePlans(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException { + QueryCompiler compiler = new QueryCompiler(statement, select, resolver, targetColumns, parallelIteratorFactory, new SequenceManager(statement)); + QueryPlan dataPlan = compiler.compile(); + return getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, false); + } + + private List<QueryPlan> getApplicablePlans(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException { SelectStatement select = (SelectStatement)dataPlan.getStatement(); // Exit early if we have a point lookup as we can't get better than that if (!useIndexes || select.isJoin() || dataPlan.getContext().getResolver().getTables().size() > 1 - || dataPlan.getContext().getScanRanges().isPointLookup()) { - return dataPlan; + || (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan)) { + return Collections.singletonList(dataPlan); } PTable dataTable = dataPlan.getTableRef().getTable(); List<PTable>indexes = Lists.newArrayList(dataTable.getIndexes()); if (indexes.isEmpty() || dataPlan.isDegenerate() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX)) { - return dataPlan; + return Collections.singletonList(dataPlan); } // The targetColumns is set for UPSERT SELECT to ensure that the proper type conversion takes place. @@ -109,20 +128,24 @@ public class QueryOptimizer { plans.add(dataPlan); QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans); if (hintedPlan != null) { - return hintedPlan; + if (stopAtBestPlan) { + return Collections.singletonList(hintedPlan); + } + plans.add(0, hintedPlan); } + for (PTable index : indexes) { QueryPlan plan = addPlan(statement, translatedIndexSelect, index, targetColumns, parallelIteratorFactory, dataPlan); if (plan != null) { // Query can't possibly return anything so just return this plan. if (plan.isDegenerate()) { - return plan; + return Collections.singletonList(plan); } plans.add(plan); } } - return chooseBestPlan(select, plans); + return hintedPlan == null ? orderPlansBestToWorst(select, plans) : plans; } private static QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException { @@ -159,12 +182,13 @@ public class QueryOptimizer { String indexName = indexHint.substring(startIndex, endIndex); int indexPos = getIndexPosition(indexes, indexName); if (indexPos >= 0) { - // Hinted index is applicable, so return it. It'll be the plan at position 1, after the data plan - QueryPlan plan = addPlan(statement, select, indexes.get(indexPos), targetColumns, parallelIteratorFactory, dataPlan); + // Hinted index is applicable, so return it's index + PTable index = indexes.get(indexPos); + indexes.remove(indexPos); + QueryPlan plan = addPlan(statement, select, index, targetColumns, parallelIteratorFactory, dataPlan); if (plan != null) { return plan; } - indexes.remove(indexPos); } startIndex = endIndex + 1; } @@ -213,7 +237,7 @@ public class QueryOptimizer { } /** - * Choose the best plan among all the possible ones. + * Order the plans among all the possible ones from best to worst. * Since we don't keep stats yet, we use the following simple algorithm: * 1) If the query is a point lookup (i.e. we have a set of exact row keys), choose among those. * 2) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression @@ -223,12 +247,16 @@ public class QueryOptimizer { * b) the plan that preserves ordering for a group by. * c) the data table plan * @param plans the list of candidate plans +<<<<<<< HEAD * @return +======= + * @return list of plans ordered from best to worst. +>>>>>>> 6c47f8a... PHOENIX-619 Support DELETE over table with immutable index when possible */ - private QueryPlan chooseBestPlan(SelectStatement select, List<QueryPlan> plans) { + private List<QueryPlan> orderPlansBestToWorst(SelectStatement select, List<QueryPlan> plans) { final QueryPlan dataPlan = plans.get(0); if (plans.size() == 1) { - return dataPlan; + return plans; } /** @@ -312,8 +340,7 @@ public class QueryOptimizer { }); - return candidates.get(0); - + return bestCandidates; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 478ce69..03ecbc9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -932,6 +932,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { desc = admin.getTableDescriptor(physicalIndexName); if (Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) { + this.tableStatsCache.invalidate(Bytes.toString(physicalIndexName)); final ReadOnlyProps props = this.getProps(); final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); if (dropMetadata) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 6669b5b..58709d8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -151,6 +151,12 @@ public class IndexUtil { for (final Mutation dataMutation : dataMutations) { long ts = MetaDataUtil.getClientTimeStamp(dataMutation); ptr.set(dataMutation.getRow()); + /* + * We only need to generate the additional mutations for a Put for immutable indexes. + * Deletes of rows are handled by running a re-written query against the index table, + * and Deletes of column values should never be necessary, as you should never be + * updating an existing row. + */ if (dataMutation instanceof Put) { // TODO: is this more efficient than looking in our mutation map // using the key plus finding the PColumn? @@ -183,14 +189,6 @@ public class IndexUtil { }; indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts)); - } else { - // We can only generate the correct Delete if we have no KV columns in our index. - // Perhaps it'd be best to ignore Delete mutations all together here, as this - // gets triggered typically for an initial population where Delete markers make - // little sense. - if (maintainer.getIndexedColumns().isEmpty()) { - indexMutations.add(maintainer.buildDeleteMutation(kvBuilder, ptr, ts)); - } } } return indexMutations; http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index cdc1805..9a84bac 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -1166,10 +1166,20 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { assertImmutableRows(conn, "T", true); conn.createStatement().execute(indexDDL); assertImmutableRows(conn, "T", true); - conn.createStatement().execute("DELETE FROM t"); + conn.createStatement().execute("DELETE FROM t WHERE v2 = 'foo'"); fail(); } catch (SQLException e) { - assertEquals(SQLExceptionCode.NO_DELETE_IF_IMMUTABLE_INDEX.getErrorCode(), e.getErrorCode()); + assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode()); + } + // Test with one index having the referenced key value column, but one not having it. + // Still should fail + try { + indexDDL = "CREATE INDEX i2 ON t (v2)"; + conn.createStatement().execute(indexDDL); + conn.createStatement().execute("DELETE FROM t WHERE v2 = 'foo'"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88121cb0/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java index 94483b5..475ab65 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/TenantSpecificViewIndexCompileTest.java @@ -97,7 +97,7 @@ public class TenantSpecificViewIndexCompileTest extends BaseConnectionlessQueryT conn.createStatement().execute("CREATE VIEW v2(v3 VARCHAR) AS SELECT * FROM v WHERE k1 > 'a'"); conn.createStatement().execute("CREATE INDEX i2 ON v2(v3) include(v2)"); - // Confirm that a read-only view on an updatable view still optimizes out the read-olnly parts of the updatable view + // Confirm that a read-only view on an updatable view still optimizes out the read-only parts of the updatable view ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM v2 WHERE v3 > 'a' and k2 = 'a' ORDER BY v3,k2"); assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _IDX_T ['me',-32767,'a'] - ['me',-32767,*]", QueryUtil.getExplainPlan(rs));