Repository: phoenix
Updated Branches:
  refs/heads/master 2da5ff2ad -> 3ef8ecc18


PHOENIX-3056 Rows cannot be deleted from immutable table when in building state 
(James Taylor)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3ef8ecc1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3ef8ecc1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3ef8ecc1

Branch: refs/heads/master
Commit: 3ef8ecc180ae3556454397ba2e6e576d2cf58cac
Parents: 2da5ff2
Author: Thomas D'Silva <[email protected]>
Authored: Fri Jul 8 13:43:46 2016 -0700
Committer: Thomas D'Silva <[email protected]>
Committed: Fri Jul 8 14:52:20 2016 -0700

----------------------------------------------------------------------
 .../end2end/index/AsyncImmutableIndexIT.java    | 114 +++++++++++++++++++
 .../apache/phoenix/compile/DeleteCompiler.java  |  82 +++++++++----
 2 files changed, 174 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3ef8ecc1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java
new file mode 100644
index 0000000..8c90b6e
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/AsyncImmutableIndexIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class AsyncImmutableIndexIT extends BaseOwnClusterHBaseManagedTimeIT {
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+            QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+            ReadOnlyProps.EMPTY_PROPS);
+    }
+    
+    @Test
+    public void testDeleteFromImmutable() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE TEST_TABLE (\n" + 
+                    "        pk1 VARCHAR NOT NULL,\n" + 
+                    "        pk2 VARCHAR NOT NULL,\n" + 
+                    "        pk3 VARCHAR\n" + 
+                    "        CONSTRAINT PK PRIMARY KEY \n" + 
+                    "        (\n" + 
+                    "        pk1,\n" + 
+                    "        pk2,\n" + 
+                    "        pk3\n" + 
+                    "        )\n" + 
+                    "        ) IMMUTABLE_ROWS=true");
+            conn.createStatement().execute("upsert into TEST_TABLE (pk1, pk2, 
pk3) values ('a', '1', '1')");
+            conn.createStatement().execute("upsert into TEST_TABLE (pk1, pk2, 
pk3) values ('b', '2', '2')");
+            conn.commit();
+            conn.createStatement().execute("CREATE INDEX TEST_INDEX ON 
TEST_TABLE (pk3, pk2) ASYNC");
+            
+            // this delete will be issued at a timestamp later than the above 
timestamp of the index table
+            conn.createStatement().execute("delete from TEST_TABLE where pk1 = 
'a'");
+            conn.commit();
+
+            // run the index MR job
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new 
Configuration(getUtility().getConfiguration()));
+            final String[] cmdArgs =
+                    IndexToolIT.getArgValues(null, "TEST_TABLE", "TEST_INDEX", 
true);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+
+            // upsert two more rows
+            conn.createStatement().execute(
+                "upsert into TEST_TABLE (pk1, pk2, pk3) values ('a', '3', 
'3')");
+            conn.createStatement().execute(
+                "upsert into TEST_TABLE (pk1, pk2, pk3) values ('b', '4', 
'4')");
+            conn.commit();
+
+            // validate that delete markers were issued correctly and only 
('a', '1', 'value1') was
+            // deleted
+            String query = "SELECT pk3 from TEST_TABLE ORDER BY pk3";
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + 
query);
+            String expectedPlan =
+                    "CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER TEST_INDEX\n"
+                            + "    SERVER FILTER BY FIRST KEY ONLY";
+            assertEquals("Wrong plan ", expectedPlan, 
QueryUtil.getExplainPlan(rs));
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("2", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("3", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("4", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3ef8ecc1/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 504f994..42efd68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -21,6 +21,7 @@ import static 
org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -80,13 +81,13 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.sun.istack.NotNull;
 
 public class DeleteCompiler {
@@ -100,7 +101,7 @@ public class DeleteCompiler {
         this.operation = operation;
     }
     
-    private static MutationState deleteRows(StatementContext childContext, 
TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, 
RowProjector projector, TableRef sourceTableRef) throws SQLException {
+    private static MutationState deleteRows(StatementContext childContext, 
TableRef targetTableRef, List<TableRef> indexTableRefs, ResultIterator 
iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
         PTable table = targetTableRef.getTable();
         PhoenixStatement statement = childContext.getStatement();
         PhoenixConnection connection = statement.getConnection();
@@ -114,11 +115,14 @@ public class DeleteCompiler {
         final int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         final int batchSize = Math.min(connection.getMutateBatchSize(), 
maxSize);
         Map<ImmutableBytesPtr,RowMutationState> mutations = 
Maps.newHashMapWithExpectedSize(batchSize);
-        Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
+        List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
         // If indexTableRef is set, we're deleting the rows from both the 
index table and
         // the data table through a single query to save executing an 
additional one.
-        if (indexTableRef != null) {
-            indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
+        if (!indexTableRefs.isEmpty()) {
+            indexMutations = 
Lists.newArrayListWithExpectedSize(indexTableRefs.size());
+            for (int i = 0; i < indexTableRefs.size(); i++) {
+                
indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize));
+            }
         }
         List<PColumn> pkColumns = table.getPKColumns();
         boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
@@ -156,10 +160,10 @@ public class DeleteCompiler {
                 // When issuing deletes, we do not care about the row time 
ranges. Also, if the table had a row timestamp column, then the
                 // row key will already have its value. 
                 mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO));
-                if (indexTableRef != null) {
+                for (int i = 0; i < indexTableRefs.size(); i++) {
                     ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // 
allocate new as this is a key in a Map
                     rs.getCurrentRow().getKey(indexPtr);
-                    indexMutations.put(indexPtr, new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO));
+                    indexMutations.get(i).put(indexPtr, new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO));
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of 
" + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -169,8 +173,8 @@ public class DeleteCompiler {
                 if (isAutoCommit && rowCount % batchSize == 0) {
                     MutationState state = new MutationState(targetTableRef, 
mutations, 0, maxSize, connection);
                     connection.getMutationState().join(state);
-                    if (indexTableRef != null) {
-                        MutationState indexState = new 
MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
+                    for (int i = 0; i < indexTableRefs.size(); i++) {
+                        MutationState indexState = new 
MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, 
connection);
                         connection.getMutationState().join(indexState);
                     }
                     connection.getMutationState().send();
@@ -184,9 +188,9 @@ public class DeleteCompiler {
             // If auto commit is true, this last batch will be committed upon 
return
             int nCommittedRows = isAutoCommit ? (rowCount / batchSize * 
batchSize) : 0;
             MutationState state = new MutationState(targetTableRef, mutations, 
nCommittedRows, maxSize, connection);
-            if (indexTableRef != null) {
+            for (int i = 0; i < indexTableRefs.size(); i++) {
                 // To prevent the counting of these index rows, we have a 
negative for remainingRows.
-                MutationState indexState = new MutationState(indexTableRef, 
indexMutations, 0, maxSize, connection);
+                MutationState indexState = new 
MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, 
connection);
                 state.join(indexState);
             }
             return state;
@@ -196,7 +200,7 @@ public class DeleteCompiler {
     private static class DeletingParallelIteratorFactory extends 
MutatingParallelIteratorFactory {
         private RowProjector projector;
         private TableRef targetTableRef;
-        private TableRef indexTableRef;
+        private List<TableRef> indexTableRefs;
         private TableRef sourceTableRef;
         
         private DeletingParallelIteratorFactory(PhoenixConnection connection) {
@@ -212,7 +216,7 @@ public class DeleteCompiler {
              * iterator being used for reading rows out.
              */
             StatementContext ctx = new StatementContext(statement, false);
-            MutationState state = deleteRows(ctx, targetTableRef, 
indexTableRef, iterator, projector, sourceTableRef);
+            MutationState state = deleteRows(ctx, targetTableRef, 
indexTableRefs, iterator, projector, sourceTableRef);
             return state;
         }
         
@@ -228,8 +232,8 @@ public class DeleteCompiler {
             this.projector = projector;
         }
 
-        public void setIndexTargetTableRef(TableRef indexTableRef) {
-            this.indexTableRef = indexTableRef;
+        public void setIndexTargetTableRefs(List<TableRef> indexTableRefs) {
+            this.indexTableRefs = indexTableRefs;
         }
         
     }
@@ -298,6 +302,17 @@ public class DeleteCompiler {
                }
     }
     
+    private static boolean hasNonPKIndexedColumns(Collection<PTable> 
immutableIndexes) {
+        for (PTable index : immutableIndexes) {
+            for (PColumn column : index.getPKColumns()) {
+                if (!IndexUtil.isDataPKColumn(column)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+    
     public MutationPlan compile(DeleteStatement delete) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
         final boolean isAutoCommit = connection.getAutoCommit();
@@ -393,8 +408,17 @@ public class DeleteCompiler {
             }
             break;
         }
-        final QueryPlan dataPlan = dataPlanToBe;
+        boolean isBuildingImmutable = false;
         final boolean hasImmutableIndexes = !immutableIndex.isEmpty();
+        if (hasImmutableIndexes) {
+            for (PTable index : immutableIndex.values()){
+                if (index.getIndexState() == PIndexState.BUILDING) {
+                    isBuildingImmutable = true;
+                    break;
+                }
+            }
+        }
+        final QueryPlan dataPlan = dataPlanToBe;
         // tableRefs is parallel with queryPlans
         TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? 
immutableIndex.size() : 1];
         if (hasImmutableIndexes) {
@@ -406,7 +430,7 @@ public class DeleteCompiler {
                 if (table.getType() == PTableType.INDEX) { // index plans
                     tableRefs[i++] = plan.getTableRef();
                     immutableIndex.remove(table.getKey());
-                } else { // data plan
+                } else if (!isBuildingImmutable) { // data plan
                     /*
                      * If we have immutable indexes that we need to maintain, 
don't execute the data plan
                      * as we can save a query by piggy-backing on any of the 
other index queries, since the
@@ -421,10 +445,18 @@ public class DeleteCompiler {
              * immutable index.
              */
             if (!immutableIndex.isEmpty()) {
-                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString())
-                
.setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException();
+                Collection<PTable> immutableIndexes = immutableIndex.values();
+                if (!isBuildingImmutable || 
hasNonPKIndexedColumns(immutableIndexes)) {
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString())
+                    
.setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException();
+                }
+                runOnServer = false;
             }
         }
+        List<TableRef> buildingImmutableIndexes = 
Lists.newArrayListWithExpectedSize(immutableIndex.values().size());
+        for (PTable index : immutableIndex.values()) {
+            buildingImmutableIndexes.add(new TableRef(index, 
dataPlan.getTableRef().getTimeStamp(), 
dataPlan.getTableRef().getLowerBoundTimeStamp()));
+        }
         
         // Make sure the first plan is targeting deletion from the data table
         // In the case of an immutable index, we'll also delete from the index.
@@ -586,7 +618,13 @@ public class DeleteCompiler {
                     }
                 });
             } else {
-                final boolean deleteFromImmutableIndexToo = 
hasImmutableIndexes && !plan.getTableRef().equals(tableRef);
+                List<TableRef> immutableIndexRefsToBe = 
Lists.newArrayListWithExpectedSize(dataPlan.getTableRef().getTable().getIndexes().size());
+                if (!buildingImmutableIndexes.isEmpty()) {
+                    immutableIndexRefsToBe = buildingImmutableIndexes;
+                } else if (hasImmutableIndexes && 
!plan.getTableRef().equals(tableRef)) {
+                    immutableIndexRefsToBe = 
Collections.singletonList(plan.getTableRef());
+                }
+                final List<TableRef> immutableIndexRefs = 
immutableIndexRefsToBe;
                 final DeletingParallelIteratorFactory parallelIteratorFactory2 
= parallelIteratorFactory;
                 mutationPlans.add( new MutationPlan() {
                     @Override
@@ -625,7 +663,7 @@ public class DeleteCompiler {
                                     
parallelIteratorFactory2.setRowProjector(plan.getProjector());
                                     
parallelIteratorFactory2.setTargetTableRef(tableRef);
                                     
parallelIteratorFactory2.setSourceTableRef(plan.getTableRef());
-                                    
parallelIteratorFactory2.setIndexTargetTableRef(deleteFromImmutableIndexToo ? 
plan.getTableRef() : null);
+                                    
parallelIteratorFactory2.setIndexTargetTableRefs(immutableIndexRefs);
                                 }
                                 while ((tuple=iterator.next()) != null) {// 
Runs query
                                     Cell kv = tuple.getValue(0);
@@ -640,7 +678,7 @@ public class DeleteCompiler {
 
                                 return state;
                             } else {
-                                return deleteRows(plan.getContext(), tableRef, 
deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, 
plan.getProjector(), plan.getTableRef());
+                                return deleteRows(plan.getContext(), tableRef, 
immutableIndexRefs, iterator, plan.getProjector(), plan.getTableRef());
                             }
                         } finally {
                             iterator.close();

Reply via email to