PHOENIX-4290 Full table scan performed for DELETE with table having immutable indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a5b5da5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a5b5da5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a5b5da5 Branch: refs/heads/5.x-HBase-2.0 Commit: 7a5b5da589372e0d25eec7aa66c5c513e73eab04 Parents: f7b16a9 Author: James Taylor <[email protected]> Authored: Mon Oct 30 19:25:53 2017 -0700 Committer: James Taylor <[email protected]> Committed: Thu Nov 9 12:42:32 2017 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/DeleteIT.java | 134 ++- .../phoenix/end2end/index/ImmutableIndexIT.java | 22 +- .../end2end/index/IndexMaintenanceIT.java | 18 +- .../org/apache/phoenix/tx/TxCheckpointIT.java | 18 +- .../apache/phoenix/compile/DeleteCompiler.java | 849 ++++++++++--------- .../apache/phoenix/compile/FromCompiler.java | 49 +- .../compile/TupleProjectionCompiler.java | 2 +- .../phoenix/exception/SQLExceptionCode.java | 1 - .../apache/phoenix/execute/MutationState.java | 4 +- .../apache/phoenix/index/IndexMaintainer.java | 35 +- .../apache/phoenix/optimize/QueryOptimizer.java | 2 +- .../org/apache/phoenix/schema/PTableImpl.java | 10 + .../java/org/apache/phoenix/util/IndexUtil.java | 18 +- .../phoenix/compile/QueryCompilerTest.java | 27 - 14 files changed, 643 insertions(+), 546 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a5b5da5/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java index 09e1021..aa4d36e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java @@ -19,7 +19,6 @@ package org.apache.phoenix.end2end; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.Date; @@ -33,7 +32,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.Test; @@ -136,18 +138,25 @@ public class DeleteIT extends ParallelStatsDisabledIT { rs.close(); } - private static void assertIndexUsed (Connection conn, String query, String indexName, boolean expectedToBeUsed) throws SQLException { - assertIndexUsed(conn, query, Collections.emptyList(), indexName, expectedToBeUsed); + private static void assertIndexUsed (Connection conn, String query, String indexName, boolean expectedToBeUsed, boolean local) throws SQLException { + assertIndexUsed(conn, query, Collections.emptyList(), indexName, expectedToBeUsed, local); } - private static void assertIndexUsed (Connection conn, String query, List<Object> binds, String indexName, boolean expectedToBeUsed) throws SQLException { + private static void assertIndexUsed (Connection conn, String query, List<Object> binds, String indexName, boolean expectedToBeUsed, boolean local) throws SQLException { PreparedStatement stmt = conn.prepareStatement("EXPLAIN " + query); for (int i = 0; i < binds.size(); i++) { stmt.setObject(i+1, binds.get(i)); } ResultSet rs = stmt.executeQuery(); String explainPlan = QueryUtil.getExplainPlan(rs); - assertEquals(expectedToBeUsed, explainPlan.contains(" SCAN OVER " + indexName)); + // It's very difficult currently to check if a local index is being used + // This check is brittle as it checks that the index ID appears in the range scan + // TODO: surface QueryPlan from MutationPlan + if (local) { + assertEquals(expectedToBeUsed, explainPlan.contains(indexName + " [1]") || explainPlan.contains(indexName + " [1,")); + } else { + assertEquals(expectedToBeUsed, explainPlan.contains(" SCAN OVER " + indexName)); + } } private void testDeleteRange(boolean autoCommit, boolean createIndex) throws Exception { @@ -190,9 +199,7 @@ public class DeleteIT extends ParallelStatsDisabledIT { PreparedStatement stmt; conn.setAutoCommit(autoCommit); deleteStmt = "DELETE FROM " + tableName + " WHERE i >= ? and i < ?"; - if(!local) { - assertIndexUsed(conn, deleteStmt, Arrays.<Object>asList(5,10), indexInUse, false); - } + assertIndexUsed(conn, deleteStmt, Arrays.<Object>asList(5,10), indexInUse, false, local); stmt = conn.prepareStatement(deleteStmt); stmt.setInt(1, 5); stmt.setInt(2, 10); @@ -202,7 +209,7 @@ public class DeleteIT extends ParallelStatsDisabledIT { } String query = "SELECT count(*) FROM " + tableName; - assertIndexUsed(conn, query, indexInUse, createIndex); + assertIndexUsed(conn, query, indexInUse, createIndex, local); query = "SELECT count(*) FROM " + tableName; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -210,9 +217,7 @@ public class DeleteIT extends ParallelStatsDisabledIT { deleteStmt = "DELETE FROM " + tableName + " WHERE j IS NULL"; stmt = conn.prepareStatement(deleteStmt); - if(!local) { - assertIndexUsed(conn, deleteStmt, indexInUse, createIndex); - } + assertIndexUsed(conn, deleteStmt, indexInUse, createIndex, local); int deleteCount = stmt.executeUpdate(); assertEquals(3, deleteCount); if (!autoCommit) { @@ -254,40 +259,40 @@ public class DeleteIT extends ParallelStatsDisabledIT { } @Test - public void testDeleteAllFromTableWithIndexAutoCommitSalting() throws SQLException { + public void testDeleteAllFromTableWithIndexAutoCommitSalting() throws Exception { testDeleteAllFromTableWithIndex(true, true, false); } @Test - public void testDeleteAllFromTableWithLocalIndexAutoCommitSalting() throws SQLException { + public void testDeleteAllFromTableWithLocalIndexAutoCommitSalting() throws Exception { testDeleteAllFromTableWithIndex(true, true, true); } @Test - public void testDeleteAllFromTableWithIndexAutoCommitNoSalting() throws SQLException { + public void testDeleteAllFromTableWithIndexAutoCommitNoSalting() throws Exception { testDeleteAllFromTableWithIndex(true, false); } @Test - public void testDeleteAllFromTableWithIndexNoAutoCommitNoSalting() throws SQLException { + public void testDeleteAllFromTableWithIndexNoAutoCommitNoSalting() throws Exception { testDeleteAllFromTableWithIndex(false,false); } @Test - public void testDeleteAllFromTableWithIndexNoAutoCommitSalted() throws SQLException { + public void testDeleteAllFromTableWithIndexNoAutoCommitSalted() throws Exception { testDeleteAllFromTableWithIndex(false, true, false); } @Test - public void testDeleteAllFromTableWithLocalIndexNoAutoCommitSalted() throws SQLException { + public void testDeleteAllFromTableWithLocalIndexNoAutoCommitSalted() throws Exception { testDeleteAllFromTableWithIndex(false, true, true); } - private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted) throws SQLException { + private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted) throws Exception { testDeleteAllFromTableWithIndex(autoCommit, isSalted, false); } - private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted, boolean localIndex) throws SQLException { + private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted, boolean localIndex) throws Exception { Connection con = null; try { con = DriverManager.getConnection(getUrl()); @@ -334,6 +339,8 @@ public class DeleteIT extends ParallelStatsDisabledIT { con.commit(); } + TestUtil.dumpTable(con.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName))); + ResultSet rs = con.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName); assertTrue(rs.next()); assertEquals(0, rs.getLong(1)); @@ -354,16 +361,16 @@ public class DeleteIT extends ParallelStatsDisabledIT { } @Test - public void testDeleteRowFromTableWithImmutableIndex() throws SQLException { - testDeleteRowFromTableWithImmutableIndex(false); + public void testDeleteRowFromTableWithImmutableIndex() throws Exception { + testDeleteRowFromTableWithImmutableIndex(false, true); } @Test - public void testDeleteRowFromTableWithImmutableLocalIndex() throws SQLException { - testDeleteRowFromTableWithImmutableIndex(true); + public void testDeleteRowFromTableWithImmutableLocalIndex() throws Exception { + testDeleteRowFromTableWithImmutableIndex(true, false); } - public void testDeleteRowFromTableWithImmutableIndex(boolean localIndex) throws SQLException { + public void testDeleteRowFromTableWithImmutableIndex(boolean localIndex, boolean useCoveredIndex) throws Exception { Connection con = null; try { boolean autoCommit = false; @@ -375,6 +382,7 @@ public class DeleteIT extends ParallelStatsDisabledIT { String tableName = generateUniqueName(); String indexName1 = generateUniqueName(); String indexName2 = generateUniqueName(); + String indexName3 = useCoveredIndex? generateUniqueName() : null; stm.execute("CREATE TABLE IF NOT EXISTS " + tableName + " (" + "HOST CHAR(2) NOT NULL," + @@ -387,6 +395,9 @@ public class DeleteIT extends ParallelStatsDisabledIT { "CONSTRAINT PK PRIMARY KEY (HOST, DOMAIN, FEATURE, \"DATE\")) IMMUTABLE_ROWS=true"); stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName1 + " ON " + tableName + " (\"DATE\", FEATURE)"); stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName2 + " ON " + tableName + " (\"DATE\", FEATURE, USAGE.DB)"); + if (useCoveredIndex) { + stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName3 + " ON " + tableName + " (STATS.ACTIVE_VISITOR) INCLUDE (USAGE.CORE, USAGE.DB)"); + } stm.close(); Date date = new Date(0); @@ -400,39 +411,48 @@ public class DeleteIT extends ParallelStatsDisabledIT { psInsert.setLong(6, 2L); psInsert.setLong(7, 3); psInsert.execute(); - psInsert.close(); if (!autoCommit) { con.commit(); } - psInsert = con.prepareStatement("DELETE FROM " + tableName + " WHERE (HOST, DOMAIN, FEATURE, \"DATE\") = (?,?,?,?)"); - psInsert.setString(1, "AA"); - psInsert.setString(2, "BB"); - psInsert.setString(3, "CC"); - psInsert.setDate(4, date); - psInsert.execute(); + PreparedStatement psDelete = con.prepareStatement("DELETE FROM " + tableName + " WHERE (HOST, DOMAIN, FEATURE, \"DATE\") = (?,?,?,?)"); + psDelete.setString(1, "AA"); + psDelete.setString(2, "BB"); + psDelete.setString(3, "CC"); + psDelete.setDate(4, date); + psDelete.execute(); if (!autoCommit) { con.commit(); } - ResultSet rs = con.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName); - assertTrue(rs.next()); - assertEquals(0, rs.getLong(1)); + assertDeleted(con, tableName, indexName1, indexName2, indexName3); - rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName1); - assertTrue(rs.next()); - assertEquals(0, rs.getLong(1)); + psInsert.execute(); + if (!autoCommit) { + con.commit(); + } - stm.execute("DROP INDEX " + indexName1 + " ON " + tableName); - stm.execute("DROP INDEX " + indexName2 + " ON " + tableName); + psDelete = con.prepareStatement("DELETE FROM " + tableName + " WHERE USAGE.DB=2"); + psDelete.execute(); + if (!autoCommit) { + con.commit(); + } + + assertDeleted(con, tableName, indexName1, indexName2, indexName3); + + psInsert.execute(); + if (!autoCommit) { + con.commit(); + } - stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName1 + " ON " + tableName + " (USAGE.DB)"); - stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName2 + " ON " + tableName + " (USAGE.DB, \"DATE\")"); - try{ - psInsert = con.prepareStatement("DELETE FROM " + tableName + " WHERE USAGE.DB=2"); - } catch(Exception e) { - fail("There should not be any exception while deleting row"); + psDelete = con.prepareStatement("DELETE FROM " + tableName + " WHERE ACTIVE_VISITOR=3"); + psDelete.execute(); + if (!autoCommit) { + con.commit(); } + + assertDeleted(con, tableName, indexName1, indexName2, indexName3); + } finally { try { con.close(); @@ -440,6 +460,28 @@ public class DeleteIT extends ParallelStatsDisabledIT { } } } + + private static void assertDeleted(Connection con, String tableName, String indexName1, String indexName2, String indexName3) + throws SQLException { + ResultSet rs; + rs = con.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName); + assertTrue(rs.next()); + assertEquals(0, rs.getLong(1)); + + rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName1); + assertTrue(rs.next()); + assertEquals(0, rs.getLong(1)); + + rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName2); + assertTrue(rs.next()); + assertEquals(0, rs.getLong(1)); + + if (indexName3 != null) { + rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName3); + assertTrue(rs.next()); + assertEquals(0, rs.getLong(1)); + } + } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a5b5da5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 9eb5440..e0398c7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -20,7 +20,6 @@ package org.apache.phoenix.end2end.index; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; @@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; @@ -149,18 +147,14 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { conn.setAutoCommit(true); String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4"; - try { - conn.createStatement().execute(dml); - if (!localIndex) { - fail(); - } - } catch (SQLException e) { - if (localIndex) { - throw e; - } - assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), - e.getErrorCode()); - } + conn.createStatement().execute(dml); + + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); conn.createStatement().execute("DROP TABLE " + fullTableName); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a5b5da5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java index d5895ae..9ff5a35 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java @@ -23,7 +23,6 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.math.BigDecimal; import java.sql.Connection; @@ -36,7 +35,6 @@ import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PropertiesUtil; @@ -341,22 +339,10 @@ public class IndexMaintenanceIT extends ParallelStatsDisabledIT { assertEquals(2, rs.getInt(1)); conn.setAutoCommit(true); - String dml = "DELETE from " + fullDataTableName + " WHERE long_col2 = 2"; - try { - conn.createStatement().execute(dml); - if (!mutable && !localIndex) { - fail(); - } - } catch (SQLException e) { - if (mutable || localIndex) { - throw e; - } - assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode()); - } + conn.createStatement().execute("DELETE from " + fullDataTableName + " WHERE long_col2 = 2"); if (!mutable) { - dml = "DELETE from " + fullDataTableName + " WHERE 2*long_col2 = 4"; - conn.createStatement().execute(dml); + conn.createStatement().execute("DELETE from " + fullDataTableName + " WHERE 2*long_col2 = 4"); } rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullDataTableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a5b5da5/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 989a97e..bf39dfe 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -311,7 +311,6 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(tableName, tableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); ResultSet rs; try (Connection conn = getConnection()) { conn.setAutoCommit(false); @@ -400,6 +399,23 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(1,rs.getLong(1)); assertFalse(rs.next()); + + conn.createStatement().execute("drop index " + indexName + " on " + fullTableName + "1"); + conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1"); + conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))"); + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); + assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved + + rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1"); + assertTrue(rs.next()); + assertEquals(1,rs.getLong(1)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1"); + assertTrue(rs.next()); + assertEquals(1,rs.getLong(1)); + assertFalse(rs.next()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a5b5da5/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index eb252d3..73689d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -19,12 +19,11 @@ package org.apache.phoenix.compile; import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO; import static org.apache.phoenix.util.NumberUtil.add; +import java.io.IOException; import java.sql.ParameterMetaData; import java.sql.SQLException; -import java.util.Arrays; -import java.util.Collection; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -34,19 +33,20 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; -import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; @@ -64,20 +64,20 @@ import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.MetaDataClient; -import org.apache.phoenix.schema.MetaDataEntityNotFoundException; +import org.apache.phoenix.schema.DelegateColumn; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.SortOrder; @@ -105,9 +105,11 @@ public class DeleteCompiler { this.operation = operation; } - private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, List<TableRef> indexTableRefs, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { - PTable table = targetTableRef.getTable(); - PhoenixStatement statement = childContext.getStatement(); + private static MutationState deleteRows(StatementContext context, ResultIterator iterator, QueryPlan bestPlan, TableRef projectedTableRef, List<TableRef> otherTableRefs) throws SQLException { + RowProjector projector = bestPlan.getProjector(); + TableRef tableRef = bestPlan.getTableRef(); + PTable table = tableRef.getTable(); + PhoenixStatement statement = context.getStatement(); PhoenixConnection connection = statement.getConnection(); PName tenantId = connection.getTenantId(); byte[] tenantIdBytes = null; @@ -123,9 +125,9 @@ public class DeleteCompiler { List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null; // If indexTableRef is set, we're deleting the rows from both the index table and // the data table through a single query to save executing an additional one. - if (!indexTableRefs.isEmpty()) { - indexMutations = Lists.newArrayListWithExpectedSize(indexTableRefs.size()); - for (int i = 0; i < indexTableRefs.size(); i++) { + if (!otherTableRefs.isEmpty()) { + indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size()); + for (int i = 0; i < otherTableRefs.size(); i++) { indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize)); } } @@ -140,38 +142,84 @@ public class DeleteCompiler { if (isMultiTenant) { values[offset++] = tenantIdBytes; } - try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { - int rowCount = 0; - while (rs.next()) { - ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map - // Use tuple directly, as projector would not have all the PK columns from - // our index table inside of our projection. Since the tables are equal, - // there's no transation required. - if (sourceTableRef.equals(targetTableRef)) { - rs.getCurrentRow().getKey(ptr); - } else { - for (int i = offset; i < values.length; i++) { - byte[] byteValue = rs.getBytes(i+1-offset); - // The ResultSet.getBytes() call will have inverted it - we need to invert it back. - // TODO: consider going under the hood and just getting the bytes - if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) { - byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length); - byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length); + try (final PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, context)) { + ValueGetter getter = null; + if (!otherTableRefs.isEmpty()) { + getter = new ValueGetter() { + final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); + final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(); + + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { + Cell cell = rs.getCurrentRow().getValue(ref.getFamily(), ref.getQualifier()); + if (cell == null) { + return null; } - values[i] = byteValue; + valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + return valuePtr; + } + + @Override + public byte[] getRowKey() { + rs.getCurrentRow().getKey(rowKeyPtr); + return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr); + } + }; + } + IndexMaintainer scannedIndexMaintainer = null; + IndexMaintainer[] maintainers = null; + PTable dataTable = table; + if (table.getType() == PTableType.INDEX) { + if (!otherTableRefs.isEmpty()) { + // The data table is always the last one in the list if it's + // not chosen as the best of the possible plans. + dataTable = otherTableRefs.get(otherTableRefs.size()-1).getTable(); + scannedIndexMaintainer = IndexMaintainer.create(dataTable, table, connection); + } + maintainers = new IndexMaintainer[otherTableRefs.size()]; + for (int i = 0; i < otherTableRefs.size(); i++) { + // Create IndexMaintainer based on projected table (i.e. SELECT expressions) so that client-side + // expressions are used instead of server-side ones. + PTable otherTable = otherTableRefs.get(i).getTable(); + if (otherTable.getType() == PTableType.INDEX) { + // In this case, we'll convert from index row -> data row -> other index row + maintainers[i] = IndexMaintainer.create(dataTable, otherTable, connection); + } else { + maintainers[i] = scannedIndexMaintainer; } - table.newKey(ptr, values); } + } else if (!otherTableRefs.isEmpty()) { + dataTable = table; + maintainers = new IndexMaintainer[otherTableRefs.size()]; + for (int i = 0; i < otherTableRefs.size(); i++) { + // Create IndexMaintainer based on projected table (i.e. SELECT expressions) so that client-side + // expressions are used instead of server-side ones. + maintainers[i] = IndexMaintainer.create(projectedTableRef.getTable(), otherTableRefs.get(i).getTable(), connection); + } + + } + byte[][] viewConstants = IndexUtil.getViewConstants(dataTable); + int rowCount = 0; + while (rs.next()) { + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + rs.getCurrentRow().getKey(rowKeyPtr); // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the - // row key will already have its value. - mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); - for (int i = 0; i < indexTableRefs.size(); i++) { + // row key will already have its value. + // Check for otherTableRefs being empty required when deleting directly from the index + if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) { + mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + } + for (int i = 0; i < otherTableRefs.size(); i++) { + PTable otherTable = otherTableRefs.get(i).getTable(); ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map - rs.getCurrentRow().getKey(indexPtr); // Translate the data table row to the index table row - if (sourceTableRef.getTable().getType() != PTableType.INDEX) { - IndexMaintainer maintainer = indexTableRefs.get(i).getTable().getIndexMaintainer(table, connection); - indexPtr.set(maintainer.buildRowKey(null, indexPtr, null, null, HConstants.LATEST_TIMESTAMP)); + if (table.getType() == PTableType.INDEX) { + indexPtr.set(scannedIndexMaintainer.buildDataRowKey(rowKeyPtr, viewConstants)); + if (otherTable.getType() == PTableType.INDEX) { + indexPtr.set(maintainers[i].buildRowKey(getter, indexPtr, null, null, HConstants.LATEST_TIMESTAMP)); + } + } else { + indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); } indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } @@ -181,10 +229,10 @@ public class DeleteCompiler { rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { - MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, maxSizeBytes, connection); + MutationState state = new MutationState(tableRef, mutations, 0, maxSize, maxSizeBytes, connection); connection.getMutationState().join(state); - for (int i = 0; i < indexTableRefs.size(); i++) { - MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection); + for (int i = 0; i < otherTableRefs.size(); i++) { + MutationState indexState = new MutationState(otherTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection); connection.getMutationState().join(indexState); } connection.getMutationState().send(); @@ -197,10 +245,9 @@ public class DeleteCompiler { // If auto commit is true, this last batch will be committed upon return int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0; - MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection); - for (int i = 0; i < indexTableRefs.size(); i++) { - // To prevent the counting of these index rows, we have a negative for remainingRows. - MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection); + MutationState state = new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection); + for (int i = 0; i < otherTableRefs.size(); i++) { + MutationState indexState = new MutationState(otherTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection); state.join(indexState); } return state; @@ -208,10 +255,9 @@ public class DeleteCompiler { } private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory { - private RowProjector projector; - private TableRef targetTableRef; - private List<TableRef> indexTableRefs; - private TableRef sourceTableRef; + private QueryPlan queryPlan; + private List<TableRef> otherTableRefs; + private TableRef projectedTableRef; private DeletingParallelIteratorFactory(PhoenixConnection connection) { super(connection); @@ -225,41 +271,36 @@ public class DeleteCompiler { * need to be captured are already getting collected in the parent statement context enclosed in the result * iterator being used for reading rows out. */ - StatementContext ctx = new StatementContext(statement, false); - MutationState state = deleteRows(ctx, targetTableRef, indexTableRefs, iterator, projector, sourceTableRef); + StatementContext context = new StatementContext(statement, false); + MutationState state = deleteRows(context, iterator, queryPlan, projectedTableRef, otherTableRefs); return state; } - public void setTargetTableRef(TableRef tableRef) { - this.targetTableRef = tableRef; + public void setQueryPlan(QueryPlan queryPlan) { + this.queryPlan = queryPlan; } - public void setSourceTableRef(TableRef tableRef) { - this.sourceTableRef = tableRef; + public void setOtherTableRefs(List<TableRef> otherTableRefs) { + this.otherTableRefs = otherTableRefs; } - public void setRowProjector(RowProjector projector) { - this.projector = projector; - } - - public void setIndexTargetTableRefs(List<TableRef> indexTableRefs) { - this.indexTableRefs = indexTableRefs; + public void setProjectedTableRef(TableRef projectedTableRef) { + this.projectedTableRef = projectedTableRef; } - } - private Map<PTableKey, PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) { + private List<PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) { PTable table = tableRef.getTable(); if (table.isImmutableRows() && !table.getIndexes().isEmpty()) { - Map<PTableKey, PTable> nonDisabledIndexes = new HashMap<PTableKey, PTable>(table.getIndexes().size()); + List<PTable> nonDisabledIndexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size()); for (PTable index : table.getIndexes()) { if (index.getIndexState() != PIndexState.DISABLE && index.getIndexType() == IndexType.GLOBAL) { - nonDisabledIndexes.put(index.getKey(), index); + nonDisabledIndexes.add(index); } } return nonDisabledIndexes; } - return Collections.emptyMap(); + return Collections.emptyList(); } private class MultiDeleteMutationPlan implements MutationPlan { @@ -361,189 +402,151 @@ public class DeleteCompiler { } } - private static boolean hasNonPKIndexedColumns(Collection<PTable> immutableIndexes) { - for (PTable index : immutableIndexes) { - for (PColumn column : index.getPKColumns()) { - if (!IndexUtil.isDataPKColumn(column)) { - return true; - } - } - } - return false; - } - public MutationPlan compile(DeleteStatement delete) throws SQLException { final PhoenixConnection connection = statement.getConnection(); final boolean isAutoCommit = connection.getAutoCommit(); - final boolean hasLimit = delete.getLimit() != null; + final boolean hasPostProcessing = delete.getLimit() != null; final ConnectionQueryServices services = connection.getQueryServices(); List<QueryPlan> queryPlans; NamedTableNode tableNode = delete.getTable(); String tableName = tableNode.getName().getTableName(); String schemaName = tableNode.getName().getSchemaName(); - boolean retryOnce = !isAutoCommit; - TableRef tableRefToBe; - boolean noQueryReqd = false; - boolean runOnServer = false; SelectStatement select = null; ColumnResolver resolverToBe = null; - Map<PTableKey, PTable> immutableIndex = Collections.emptyMap(); - DeletingParallelIteratorFactory parallelIteratorFactory; - QueryPlan dataPlanToBe = null; - while (true) { - try { - resolverToBe = FromCompiler.getResolverForMutation(delete, connection); - tableRefToBe = resolverToBe.getTables().get(0); - PTable table = tableRefToBe.getTable(); - // Cannot update: - // - read-only VIEW - // - transactional table with a connection having an SCN - // TODO: SchemaUtil.isReadOnly(PTable, connection)? - if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) { - throw new ReadOnlyTableException(schemaName,tableName); - } - else if (table.isTransactional() && connection.getSCN() != null) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName) - .setTableName(tableName).build().buildException(); - } - - immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe); - boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty(); - noQueryReqd = !hasLimit; - // Can't run on same server for transactional data, as we need the row keys for the data - // that is being upserted for conflict detection purposes. - runOnServer = isAutoCommit && noQueryReqd && !table.isTransactional(); - HintNode hint = delete.getHint(); - if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) { - hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE); - } + DeletingParallelIteratorFactory parallelIteratorFactoryToBe; + resolverToBe = FromCompiler.getResolverForMutation(delete, connection); + final TableRef targetTableRef = resolverToBe.getTables().get(0); + PTable table = targetTableRef.getTable(); + // Cannot update: + // - read-only VIEW + // - transactional table with a connection having an SCN + // TODO: SchemaUtil.isReadOnly(PTable, connection)? + if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) { + throw new ReadOnlyTableException(schemaName,tableName); + } + else if (table.isTransactional() && connection.getSCN() != null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName) + .setTableName(tableName).build().buildException(); + } - List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size()); - boolean isSalted = table.getBucketNum() != null; - boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant(); - boolean isSharedViewIndex = table.getViewIndexId() != null; - for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); i < table.getPKColumns().size(); i++) { - PColumn column = table.getPKColumns().get(i); - aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null))); - } - select = FACTORY.select(delete.getTable(), hint, false, aliasedNodes, delete.getWhere(), - Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null, - delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), - delete.getUdfParseNodes()); - select = StatementNormalizer.normalize(select, resolverToBe); - SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection); - if (transformedSelect != select) { - resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection, false, delete.getTable().getName()); - select = StatementNormalizer.normalize(transformedSelect, resolverToBe); - } - parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection); - QueryOptimizer optimizer = new QueryOptimizer(services); - QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory, new SequenceManager(statement)); - dataPlanToBe = compiler.compile(); - queryPlans = Lists.newArrayList(mayHaveImmutableIndexes - ? optimizer.getApplicablePlans(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory) - : optimizer.getBestPlan(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)); - if (mayHaveImmutableIndexes) { // FIXME: this is ugly - // Lookup the table being deleted from in the cache, as it's possible that the - // optimizer updated the cache if it found indexes that were out of date. - // If the index was marked as disabled, it should not be in the list - // of immutable indexes. - table = connection.getTable(new PTableKey(table.getTenantId(), table.getName().getString())); - tableRefToBe.setTable(table); - immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe); - } - } catch (MetaDataEntityNotFoundException e) { - // Catch column/column family not found exception, as our meta data may - // be out of sync. Update the cache once and retry if we were out of sync. - // Otherwise throw, as we'll just get the same error next time. - if (retryOnce) { - retryOnce = false; - MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName); - if (result.wasUpdated()) { - continue; - } - } - throw e; - } - break; - } - boolean isBuildingImmutable = false; - final boolean hasImmutableIndexes = !immutableIndex.isEmpty(); - if (hasImmutableIndexes) { - for (PTable index : immutableIndex.values()){ - if (index.getIndexState() == PIndexState.BUILDING) { - isBuildingImmutable = true; - break; - } - } + List<PTable> immutableIndexes = getNonDisabledGlobalImmutableIndexes(targetTableRef); + final boolean hasImmutableIndexes = !immutableIndexes.isEmpty(); + + boolean isSalted = table.getBucketNum() != null; + boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant(); + boolean isSharedViewIndex = table.getViewIndexId() != null; + int pkColumnOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); + final int pkColumnCount = table.getPKColumns().size() - pkColumnOffset; + int selectColumnCount = pkColumnCount; + for (PTable index : immutableIndexes) { + selectColumnCount += index.getPKColumns().size() - pkColumnCount; } - final QueryPlan dataPlan = dataPlanToBe; - // tableRefs is parallel with queryPlans - TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1]; - if (hasImmutableIndexes) { - int i = 0; - Iterator<QueryPlan> plans = queryPlans.iterator(); - while (plans.hasNext()) { - QueryPlan plan = plans.next(); - PTable table = plan.getTableRef().getTable(); - if (table.getType() == PTableType.INDEX) { // index plans - tableRefs[i++] = plan.getTableRef(); - immutableIndex.remove(table.getKey()); - } else if (!isBuildingImmutable) { // data plan - /* - * If we have immutable indexes that we need to maintain, don't execute the data plan - * as we can save a query by piggy-backing on any of the other index queries, since the - * PK columns that we need are always in each index row. - */ - plans.remove(); + List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(selectColumnCount + pkColumnOffset); + List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(selectColumnCount); + for (int i = isSalted ? 1 : 0; i < pkColumnOffset; i++) { + PColumn column = table.getPKColumns().get(i); + projectedColumns.add(column); + } + for (int i = pkColumnOffset; i < table.getPKColumns().size(); i++) { + PColumn column = table.getPKColumns().get(i); + projectedColumns.add(column); + aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null))); + } + // Project all non PK indexed columns so that we can do the proper index maintenance + for (PTable index : table.getIndexes()) { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + // Go through maintainer as it handles functional indexes correctly + for (Pair<String,String> columnInfo : maintainer.getIndexedColumnInfo()) { + String familyName = columnInfo.getFirst(); + if (familyName != null) { + String columnName = columnInfo.getSecond(); + boolean hasNoColumnFamilies = table.getColumnFamilies().isEmpty(); + PColumn column = hasNoColumnFamilies ? table.getColumnForColumnName(columnName) : table.getColumnFamily(familyName).getPColumnForColumnName(columnName); + projectedColumns.add(column); + aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(hasNoColumnFamilies ? null : TableName.create(null, familyName), '"' + columnName + '"', null))); } } - /* - * If we have any immutable indexes remaining, then that means that the plan for that index got filtered out - * because it could not be executed. This would occur if a column in the where clause is not found in the - * immutable index. - */ - if (!immutableIndex.isEmpty()) { - Collection<PTable> immutableIndexes = immutableIndex.values(); - if (!isBuildingImmutable || hasNonPKIndexedColumns(immutableIndexes)) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString()) - .setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException(); + } + select = FACTORY.select(delete.getTable(), delete.getHint(), false, aliasedNodes, delete.getWhere(), + Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null, + delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), + delete.getUdfParseNodes()); + select = StatementNormalizer.normalize(select, resolverToBe); + + SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection); + boolean hasPreProcessing = transformedSelect != select; + if (transformedSelect != select) { + resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection, false, delete.getTable().getName()); + select = StatementNormalizer.normalize(transformedSelect, resolverToBe); + } + final boolean hasPreOrPostProcessing = hasPreProcessing || hasPostProcessing; + boolean noQueryReqd = !hasPreOrPostProcessing; + // No limit and no sub queries, joins, etc in where clause + // Can't run on same server for transactional data, as we need the row keys for the data + // that is being upserted for conflict detection purposes. + // If we have immutable indexes, we'd increase the number of bytes scanned by executing + // separate queries against each index, so better to drive from a single table in that case. + boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasImmutableIndexes; + HintNode hint = delete.getHint(); + if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) { + select = SelectStatement.create(select, HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE)); + } + + parallelIteratorFactoryToBe = hasPreOrPostProcessing ? null : new DeletingParallelIteratorFactory(connection); + QueryOptimizer optimizer = new QueryOptimizer(services); + QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe, new SequenceManager(statement)); + final QueryPlan dataPlan = compiler.compile(); + // TODO: the select clause should know that there's a sub query, but doesn't seem to currently + queryPlans = Lists.newArrayList(!immutableIndexes.isEmpty() + ? optimizer.getApplicablePlans(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe) + : optimizer.getBestPlan(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe)); + // Filter out any local indexes that don't contain all indexed columns. + // We have to do this manually because local indexes are still used + // when referenced columns aren't in the index, so they won't be + // filtered by the optimizer. + queryPlans = new ArrayList<>(queryPlans); + Iterator<QueryPlan> iterator = queryPlans.iterator(); + while (iterator.hasNext()) { + QueryPlan plan = iterator.next(); + if (plan.getTableRef().getTable().getIndexType() == IndexType.LOCAL) { + if (!plan.getContext().getDataColumns().isEmpty()) { + iterator.remove(); } - runOnServer = false; - } + } } - List<TableRef> buildingImmutableIndexes = Lists.newArrayListWithExpectedSize(immutableIndex.values().size()); - for (PTable index : immutableIndex.values()) { - buildingImmutableIndexes.add(new TableRef(index, dataPlan.getTableRef().getTimeStamp(), dataPlan.getTableRef().getLowerBoundTimeStamp())); + if (queryPlans.isEmpty()) { + queryPlans = Collections.singletonList(dataPlan); } - // Make sure the first plan is targeting deletion from the data table - // In the case of an immutable index, we'll also delete from the index. - final TableRef dataTableRef = tableRefs[0] = tableRefToBe; - /* - * Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows - * from the data table, while the others will be for deleting rows from immutable indexes. - */ - List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(tableRefs.length); - for (int i = 0; i < tableRefs.length; i++) { - final TableRef tableRef = tableRefs[i]; - final QueryPlan plan = queryPlans.get(i); - if (!plan.getTableRef().equals(tableRef) || !(plan instanceof BaseQueryPlan)) { - runOnServer = false; - noQueryReqd = false; // FIXME: why set this to false in this case? - } - - final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); - - final StatementContext context = plan.getContext(); - // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all. - // A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause - // may have been optimized out. Instead, we check that there's a single SkipScanFilter - if (noQueryReqd - && (!context.getScan().hasFilter() - || context.getScan().getFilter() instanceof SkipScanFilter) - && context.getScanRanges().isPointLookup()) { + runOnServer &= queryPlans.get(0).getTableRef().getTable().getType() != PTableType.INDEX; + + // We need to have all indexed columns available in all immutable indexes in order + // to generate the delete markers from the query. We also cannot have any filters + // except for our SkipScanFilter for point lookups. + // A simple check of the non existence of a where clause in the parse node is not sufficient, as the where clause + // may have been optimized out. Instead, we check that there's a single SkipScanFilter + // If we can generate a plan for every index, that means all the required columns are available in every index, + // hence we can drive the delete from any of the plans. + noQueryReqd &= queryPlans.size() == 1 + immutableIndexes.size(); + int queryPlanIndex = 0; + while (noQueryReqd && queryPlanIndex < queryPlans.size()) { + QueryPlan plan = queryPlans.get(queryPlanIndex++); + StatementContext context = plan.getContext(); + noQueryReqd &= (!context.getScan().hasFilter() + || context.getScan().getFilter() instanceof SkipScanFilter) + && context.getScanRanges().isPointLookup(); + } + + final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); + + // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all. + if (noQueryReqd) { + // Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows + // from the data table, while the others will be for deleting rows from immutable indexes. + List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size()); + for (final QueryPlan plan : queryPlans) { + final StatementContext context = plan.getContext(); mutationPlans.add(new MutationPlan() { @Override @@ -561,7 +564,7 @@ public class DeleteCompiler { while (iterator.hasNext()) { mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } - return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); + return new MutationState(context.getCurrentTable(), mutation, 0, maxSize, maxSizeBytes, connection); } @Override @@ -576,7 +579,7 @@ public class DeleteCompiler { @Override public TableRef getTargetRef() { - return dataTableRef; + return dataPlan.getTableRef(); } @Override @@ -605,202 +608,230 @@ public class DeleteCompiler { return 0l; } }); - } else if (runOnServer) { - // TODO: better abstraction - Scan scan = context.getScan(); - scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); + } + return new MultiDeleteMutationPlan(mutationPlans); + } else if (runOnServer) { + // TODO: better abstraction + final StatementContext context = dataPlan.getContext(); + Scan scan = context.getScan(); + scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); + + // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where> + // The coprocessor will delete each row returned from the scan + // Ignoring ORDER BY, since with auto commit on and no limit makes no difference + SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint()); + RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY); + context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY); + if (dataPlan.getProjector().projectEveryRow()) { + projectorToBe = new RowProjector(projectorToBe,true); + } + final RowProjector projector = projectorToBe; + final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null, + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTargetRef() { + return dataPlan.getTableRef(); + } - // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where> - // The coprocessor will delete each row returned from the scan - // Ignoring ORDER BY, since with auto commit on and no limit makes no difference - SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint()); - RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY); - context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY); - if (plan.getProjector().projectEveryRow()) { - projectorToBe = new RowProjector(projectorToBe,true); - } - final RowProjector projector = projectorToBe; - final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, null, - OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); - mutationPlans.add(new MutationPlan() { - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } + @Override + public Set<TableRef> getSourceRefs() { + return dataPlan.getSourceRefs(); + } - @Override - public StatementContext getContext() { - return context; - } + @Override + public Operation getOperation() { + return operation; + } - @Override - public TableRef getTargetRef() { - return dataTableRef; - } - - @Override - public Set<TableRef> getSourceRefs() { - return dataPlan.getSourceRefs(); - } - - @Override - public Operation getOperation() { - return operation; - } - - @Override - public MutationState execute() throws SQLException { - // TODO: share this block of code with UPSERT SELECT - ImmutableBytesWritable ptr = context.getTempPtr(); - PTable table = tableRef.getTable(); - table.getIndexMaintainers(ptr, context.getConnection()); - byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; - ServerCache cache = null; - try { - if (ptr.getLength() > 0) { - byte[] uuidValue = ServerCacheClient.generateId(); - context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); - context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - } - ResultIterator iterator = aggPlan.iterator(); + @Override + public MutationState execute() throws SQLException { + // TODO: share this block of code with UPSERT SELECT + ImmutableBytesWritable ptr = context.getTempPtr(); + PTable table = dataPlan.getTableRef().getTable(); + table.getIndexMaintainers(ptr, context.getConnection()); + byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; + ServerCache cache = null; try { - Tuple row = iterator.next(); - final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); - return new MutationState(maxSize, maxSizeBytes, connection) { - @Override - public long getUpdateCount() { - return mutationCount; - } - }; + if (ptr.getLength() > 0) { + byte[] uuidValue = ServerCacheClient.generateId(); + context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); + context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + ResultIterator iterator = aggPlan.iterator(); + try { + Tuple row = iterator.next(); + final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); + return new MutationState(maxSize, maxSizeBytes, connection) { + @Override + public long getUpdateCount() { + return mutationCount; + } + }; + } finally { + iterator.close(); + } } finally { - iterator.close(); - } - } finally { - if (cache != null) { - cache.close(); + if (cache != null) { + cache.close(); + } } } - } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("DELETE ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("DELETE ROWS"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - - @Override - public Long getEstimatedRowsToScan() throws SQLException { - return aggPlan.getEstimatedRowsToScan(); - } - - @Override - public Long getEstimatedBytesToScan() throws SQLException { - return aggPlan.getEstimatedBytesToScan(); - } - - @Override - public Long getEstimateInfoTimestamp() throws SQLException { - return aggPlan.getEstimateInfoTimestamp(); - } - }); - } else { - List<TableRef> immutableIndexRefsToBe = Lists.newArrayListWithExpectedSize(dataPlan.getTableRef().getTable().getIndexes().size()); - if (!buildingImmutableIndexes.isEmpty()) { - immutableIndexRefsToBe = buildingImmutableIndexes; - } else if (hasImmutableIndexes && !plan.getTableRef().equals(tableRef)) { - immutableIndexRefsToBe = Collections.singletonList(plan.getTableRef()); - } - final List<TableRef> immutableIndexRefs = immutableIndexRefsToBe; - final DeletingParallelIteratorFactory parallelIteratorFactory2 = parallelIteratorFactory; - mutationPlans.add( new MutationPlan() { - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } + @Override + public Long getEstimatedRowsToScan() throws SQLException { + return aggPlan.getEstimatedRowsToScan(); + } - @Override - public StatementContext getContext() { - return context; - } + @Override + public Long getEstimatedBytesToScan() throws SQLException { + return aggPlan.getEstimatedBytesToScan(); + } + @Override + public Long getEstimateInfoTimestamp() throws SQLException { + return aggPlan.getEstimateInfoTimestamp(); + } + }; + } else { + final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe; + List<PColumn> adjustedProjectedColumns = Lists.newArrayListWithExpectedSize(projectedColumns.size()); + final int offset = table.getBucketNum() == null ? 0 : 1; + for (int i = 0; i < projectedColumns.size(); i++) { + final int position = i; + adjustedProjectedColumns.add(new DelegateColumn(projectedColumns.get(i)) { @Override - public TableRef getTargetRef() { - return dataTableRef; - } - - @Override - public Set<TableRef> getSourceRefs() { - return dataPlan.getSourceRefs(); + public int getPosition() { + return position + offset; } + }); + } + PTable projectedTable = PTableImpl.makePTable(table, PTableType.PROJECTED, adjustedProjectedColumns); + final TableRef projectedTableRef = new TableRef(projectedTable, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp()); + + QueryPlan bestPlanToBe = dataPlan; + for (QueryPlan plan : queryPlans) { + PTable planTable = plan.getTableRef().getTable(); + if (planTable.getIndexState() != PIndexState.BUILDING) { + bestPlanToBe = plan; + break; + } + } + final QueryPlan bestPlan = bestPlanToBe; + final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(immutableIndexes.size()); + for (PTable index : immutableIndexes) { + if (!bestPlan.getTableRef().getTable().equals(index)) { + otherTableRefs.add(new TableRef(index, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp())); + } + } + + if (!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) { + otherTableRefs.add(projectedTableRef); + } + final StatementContext context = bestPlan.getContext(); + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } - @Override - public Operation getOperation() { - return operation; - } + @Override + public StatementContext getContext() { + return context; + } - @Override - public MutationState execute() throws SQLException { - ResultIterator iterator = plan.iterator(); - try { - if (!hasLimit) { - Tuple tuple; - long totalRowCount = 0; - if (parallelIteratorFactory2 != null) { - parallelIteratorFactory2.setRowProjector(plan.getProjector()); - parallelIteratorFactory2.setTargetTableRef(tableRef); - parallelIteratorFactory2.setSourceTableRef(plan.getTableRef()); - parallelIteratorFactory2.setIndexTargetTableRefs(immutableIndexRefs); - } - while ((tuple=iterator.next()) != null) {// Runs query - Cell kv = tuple.getValue(0); - totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); - } - // Return total number of rows that have been delete. In the case of auto commit being off - // the mutations will all be in the mutation state of the current connection. - MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount); + @Override + public TableRef getTargetRef() { + return targetTableRef; + } - // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. - state.setReadMetricQueue(plan.getContext().getReadMetricsQueue()); + @Override + public Set<TableRef> getSourceRefs() { + return dataPlan.getSourceRefs(); + } - return state; - } else { - return deleteRows(plan.getContext(), tableRef, immutableIndexRefs, iterator, plan.getProjector(), plan.getTableRef()); + @Override + public Operation getOperation() { + return operation; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = bestPlan.iterator(); + try { + if (!hasPreOrPostProcessing) { + Tuple tuple; + long totalRowCount = 0; + if (parallelIteratorFactory != null) { + parallelIteratorFactory.setQueryPlan(bestPlan); + parallelIteratorFactory.setOtherTableRefs(otherTableRefs); + parallelIteratorFactory.setProjectedTableRef(projectedTableRef); } - } finally { - iterator.close(); + while ((tuple=iterator.next()) != null) {// Runs query + Cell kv = tuple.getValue(0); + totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); + } + // Return total number of rows that have been deleted from the table. In the case of auto commit being off + // the mutations will all be in the mutation state of the current connection. We need to divide by the + // total number of tables we updated as otherwise the client will get an unexpected result + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount / ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size())); + + // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. + state.setReadMetricQueue(context.getReadMetricsQueue()); + + return state; + } else { + return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs); } + } finally { + iterator.close(); } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> queryPlanSteps = plan.getExplainPlan().getPlanSteps(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("DELETE ROWS"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } + } - @Override - public Long getEstimatedRowsToScan() throws SQLException { - return plan.getEstimatedRowsToScan(); - } + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = bestPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("DELETE ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } - @Override - public Long getEstimatedBytesToScan() throws SQLException { - return plan.getEstimatedBytesToScan(); - } + @Override + public Long getEstimatedRowsToScan() throws SQLException { + return bestPlan.getEstimatedRowsToScan(); + } - @Override - public Long getEstimateInfoTimestamp() throws SQLException { - return plan.getEstimateInfoTimestamp(); - } - }); - } + @Override + public Long getEstimatedBytesToScan() throws SQLException { + return bestPlan.getEstimatedBytesToScan(); + } + + @Override + public Long getEstimateInfoTimestamp() throws SQLException { + return bestPlan.getEstimateInfoTimestamp(); + } + }; } - return mutationPlans.size() == 1 ? mutationPlans.get(0) : new MultiDeleteMutationPlan(mutationPlans); } } \ No newline at end of file
