Repository: phoenix Updated Branches: refs/heads/txn 9270db150 -> ea523e7c5
Add transaction logic for Delete (with failing test for now) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ea523e7c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ea523e7c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ea523e7c Branch: refs/heads/txn Commit: ea523e7c5d3a788838974ae581cde74c7b10de75 Parents: 9270db1 Author: James Taylor <[email protected]> Authored: Mon Apr 20 11:13:23 2015 -0700 Committer: James Taylor <[email protected]> Committed: Mon Apr 20 11:13:23 2015 -0700 ---------------------------------------------------------------------- .../phoenix/transactions/TransactionIT.java | 46 ++++++++++++++++++++ .../apache/phoenix/compile/DeleteCompiler.java | 26 +++++++---- 2 files changed, 64 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea523e7c/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java index 31adcb9..8babaae 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java @@ -141,6 +141,52 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } } + @Test + public void testDelete() throws Exception { + String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME; + Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl()); + try { + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSQL); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsert); + // upsert two rows + setRowKeyColumns(stmt, 1); + stmt.execute(); + conn1.commit(); + + setRowKeyColumns(stmt, 2); + stmt.execute(); + + // verify rows can be read even though commit has not been called + int rowsDeleted = conn1.createStatement().executeUpdate("DELETE FROM " + FULL_TABLE_NAME); + assertEquals(2, rowsDeleted); + + // Delete and second upsert not committed yet, so there should be one row. + // FIXME: aggregate queries don't appear to honor the transaction information + // rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME); + rs = conn2.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME); + assertTrue(rs.next()); + // FIXME: (see above) + // assertEquals(1, rs.getInt(1)); + assertFalse(rs.next()); + + conn1.commit(); + + // verify rows are deleted after commit + // FIXME: this is failing, I think because Tephra isn't handling deletes like we need it to + // TODO: confirm this works once we get the patch from Gary. + rs = conn1.createStatement().executeQuery(selectSQL); + assertFalse(rs.next()); + } + finally { + conn1.close(); + } + } + @Test public void testAutoCommitQuerySingleTable() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea523e7c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 0778f75..408c622 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -291,12 +291,13 @@ public class DeleteCompiler { boolean noQueryReqd = false; boolean runOnServer = false; SelectStatement select = null; + ColumnResolver resolverToBe = null; Set<PTable> immutableIndex = Collections.emptySet(); DeletingParallelIteratorFactory parallelIteratorFactory = null; while (true) { try { - ColumnResolver resolver = FromCompiler.getResolverForMutation(delete, connection); - tableRefToBe = resolver.getTables().get(0); + resolverToBe = FromCompiler.getResolverForMutation(delete, connection); + tableRefToBe = resolverToBe.getTables().get(0); PTable table = tableRefToBe.getTable(); // Cannot update: // - read-only VIEW @@ -332,17 +333,17 @@ public class DeleteCompiler { Collections.<ParseNode>emptyList(), null, delete.getOrderBy(), delete.getLimit(), delete.getBindCount(), false, false); - select = StatementNormalizer.normalize(select, resolver); - SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection); + select = StatementNormalizer.normalize(select, resolverToBe); + SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection); if (transformedSelect != select) { - resolver = FromCompiler.getResolverForQuery(transformedSelect, connection); - select = StatementNormalizer.normalize(transformedSelect, resolver); + resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection); + select = StatementNormalizer.normalize(transformedSelect, resolverToBe); } parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection); QueryOptimizer optimizer = new QueryOptimizer(services); queryPlans = Lists.newArrayList(mayHaveImmutableIndexes - ? optimizer.getApplicablePlans(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory) - : optimizer.getBestPlan(statement, select, resolver, Collections.<PColumn>emptyList(), parallelIteratorFactory)); + ? optimizer.getApplicablePlans(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory) + : optimizer.getBestPlan(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)); if (mayHaveImmutableIndexes) { // FIXME: this is ugly // Lookup the table being deleted from in the cache, as it's possible that the // optimizer updated the cache if it found indexes that were out of date. @@ -367,6 +368,7 @@ public class DeleteCompiler { } break; } + final ColumnResolver resolver = resolverToBe; final boolean hasImmutableIndexes = !immutableIndex.isEmpty(); // tableRefs is parallel with queryPlans TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1]; @@ -559,6 +561,14 @@ public class DeleteCompiler { @Override public MutationState execute() throws SQLException { + // Repeated from PhoenixStatement.executeQuery which this call bypasses. + // Send mutations to hbase, so they are visible to subsequent reads. + // Use original plan for data table so that data and immutable indexes will be sent. + boolean isTransactional = connection.getMutationState().startTransaction(resolver.getTables().iterator()); + if (isTransactional) { + // Use real query plan so that we have the right context object. + plan.getContext().setTransaction(connection.getMutationState().getTransaction()); + } ResultIterator iterator = plan.iterator(); if (!hasLimit) { Tuple tuple;
