Repository: phoenix
Updated Branches:
  refs/heads/calcite 7cfe49114 -> f344e3355


PHOENIX-2197 Support DML in Phoenix/Calcite integration (part 3: implement 
DELETE)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f344e335
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f344e335
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f344e335

Branch: refs/heads/calcite
Commit: f344e3355ea1257144edde8306a1fe76675f5ab6
Parents: 7cfe491
Author: maryannxue <[email protected]>
Authored: Thu Jun 9 10:51:27 2016 -0400
Committer: maryannxue <[email protected]>
Committed: Thu Jun 9 10:51:27 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/calcite/CalciteDMLIT.java    |  16 +-
 .../phoenix/calcite/rel/PhoenixTableModify.java | 156 ++++++++++++++++++-
 2 files changed, 170 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f344e335/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
index fc58968..4c1bd0d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
@@ -101,12 +101,26 @@ public class CalciteDMLIT extends BaseCalciteIT {
             .close();
     }
     
+    @Ignore // CALCITE-1278
     @Test public void testDelete() throws Exception {
+        start(PROPS).sql("upsert into atable(organization_id, entity_id) 
values('1', '1')")
+            .explainIs("PhoenixToEnumerableConverter\n" +
+                       "  PhoenixTableModify(table=[[phoenix, ATABLE]], 
operation=[INSERT], updateColumnList=[[]], flattened=[false])\n" +
+                       "    PhoenixClientProject(ORGANIZATION_ID=[$0], 
ENTITY_ID=[$1], A_STRING=[null], B_STRING=[null], A_INTEGER=[null], 
A_DATE=[null], A_TIME=[null], A_TIMESTAMP=[null], X_DECIMAL=[null], 
X_LONG=[null], X_INTEGER=[null], Y_INTEGER=[null], A_BYTE=[null], 
A_SHORT=[null], A_FLOAT=[null], A_DOUBLE=[null], A_UNSIGNED_FLOAT=[null], 
A_UNSIGNED_DOUBLE=[null])\n" +
+                       "      PhoenixValues(tuples=[[{ '1              ', '1   
           ' }]])\n")
+            .executeUpdate()
+            .close();
+        start(PROPS).sql("select organization_id, entity_id from aTable where 
organization_id = '1'")
+            .resultIs(0, new Object[][] {{"1              ", "1              
"}})
+            .close();
         start(PROPS).sql("delete from atable where organization_id = '1' and 
entity_id = '1'")
             .explainIs("PhoenixToEnumerableConverter\n" +
                        "  PhoenixTableModify(table=[[phoenix, ATABLE]], 
operation=[DELETE], updateColumnList=[[]], flattened=[false])\n" +
                        "    PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[AND(=($0, CAST('1'):CHAR(15) CHARACTER SET \"ISO-8859-1\" COLLATE 
\"ISO-8859-1$en_US$primary\" NOT NULL), =($1, CAST('1'):CHAR(15) CHARACTER SET 
\"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL))])\n")
-            //.executeUpdate()
+            .executeUpdate()
+            .close();
+        start(PROPS).sql("select * from aTable where organization_id = '1'")
+            .resultIs(new Object[][] {})
             .close();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f344e335/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index 196f839..1b83be2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -1,8 +1,11 @@
 package org.apache.phoenix.calcite.rel;
 
+import static 
org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
+
 import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,11 +43,15 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.Lists;
@@ -85,7 +92,7 @@ public class PhoenixTableModify extends TableModify 
implements PhoenixRel {
         }
         
         // delete
-        throw new UnsupportedOperationException();
+        return delete(connection, targetTable, targetTableRef, queryPlan, 
projector);
     }
     
     private static MutationPlan upsert(final PhoenixConnection connection,
@@ -238,4 +245,151 @@ public class PhoenixTableModify extends TableModify 
implements PhoenixRel {
         mutation.put(ptr, new RowMutationState(columnValues, 
connection.getStatementExecutionCounter(), rowTsColInfo));
     }
 
+    private static MutationPlan delete(final PhoenixConnection connection,
+            final PhoenixTable targetTable, final TableRef targetTableRef,
+            final QueryPlan queryPlan, final RowProjector projector) {
+        final StatementContext context = queryPlan.getContext();
+        // TODO
+        final boolean deleteFromImmutableIndexToo = false;
+        return new MutationPlan() {
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return context.getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public StatementContext getContext() {
+                return context;
+            }
+
+            @Override
+            public TableRef getTargetRef() {
+                return targetTableRef;
+            }
+
+            @Override
+            public Set<TableRef> getSourceRefs() {
+                // TODO dataPlan.getSourceRefs();
+                return queryPlan.getSourceRefs();
+            }
+
+            @Override
+            public org.apache.phoenix.jdbc.PhoenixStatement.Operation 
getOperation() {
+                return 
org.apache.phoenix.jdbc.PhoenixStatement.Operation.DELETE;
+            }
+
+            @Override
+            public MutationState execute() throws SQLException {
+                ResultIterator iterator = queryPlan.iterator();
+                try {
+                    // TODO hasLimit??
+                    return deleteRows(context, targetTableRef, 
deleteFromImmutableIndexToo ? queryPlan.getTableRef() : null, iterator, 
projector, queryPlan.getTableRef());
+                } finally {
+                    iterator.close();
+                }
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                List<String> queryPlanSteps =  
queryPlan.getExplainPlan().getPlanSteps();
+                List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                planSteps.add("DELETE ROWS");
+                planSteps.addAll(queryPlanSteps);
+                return new ExplainPlan(planSteps);
+            }
+        };
+    }
+    
+    private static MutationState deleteRows(StatementContext childContext, 
TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, 
RowProjector projector, TableRef sourceTableRef) throws SQLException {
+        PTable table = targetTableRef.getTable();
+        PhoenixStatement statement = childContext.getStatement();
+        PhoenixConnection connection = statement.getConnection();
+        PName tenantId = connection.getTenantId();
+        byte[] tenantIdBytes = null;
+        if (tenantId != null) {
+            tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), 
table.getBucketNum() != null, tenantId);
+        }
+        final boolean isAutoCommit = connection.getAutoCommit();
+        ConnectionQueryServices services = connection.getQueryServices();
+        final int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        final int batchSize = Math.min(connection.getMutateBatchSize(), 
maxSize);
+        Map<ImmutableBytesPtr,RowMutationState> mutations = 
Maps.newHashMapWithExpectedSize(batchSize);
+        Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
+        // If indexTableRef is set, we're deleting the rows from both the 
index table and
+        // the data table through a single query to save executing an 
additional one.
+        if (indexTableRef != null) {
+            indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
+        }
+        List<PColumn> pkColumns = table.getPKColumns();
+        boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
+        boolean isSharedViewIndex = table.getViewIndexId() != null;
+        int offset = (table.getBucketNum() == null ? 0 : 1);
+        byte[][] values = new byte[pkColumns.size()][];
+        if (isMultiTenant) {
+            values[offset++] = tenantIdBytes;
+        }
+        if (isSharedViewIndex) {
+            values[offset++] = 
MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
+        }
+        try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, 
childContext)) {
+            int rowCount = 0;
+            while (rs.next()) {
+                ImmutableBytesPtr ptr = new ImmutableBytesPtr();  // allocate 
new as this is a key in a Map
+                // Use tuple directly, as projector would not have all the PK 
columns from
+                // our index table inside of our projection. Since the tables 
are equal,
+                // there's no transation required.
+                if (sourceTableRef.equals(targetTableRef)) {
+                    rs.getCurrentRow().getKey(ptr);
+                } else {
+                    for (int i = offset; i < values.length; i++) {
+                        byte[] byteValue = rs.getBytes(i+1-offset);
+                        // The ResultSet.getBytes() call will have inverted it 
- we need to invert it back.
+                        // TODO: consider going under the hood and just 
getting the bytes
+                        if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) 
{
+                            byte[] tempByteValue = Arrays.copyOf(byteValue, 
byteValue.length);
+                            byteValue = SortOrder.invert(byteValue, 0, 
tempByteValue, 0, byteValue.length);
+                        }
+                        values[i] = byteValue;
+                    }
+                    table.newKey(ptr, values);
+                }
+                // When issuing deletes, we do not care about the row time 
ranges. Also, if the table had a row timestamp column, then the
+                // row key will already have its value. 
+                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO));
+                if (indexTableRef != null) {
+                    ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // 
allocate new as this is a key in a Map
+                    rs.getCurrentRow().getKey(indexPtr);
+                    indexMutations.put(indexPtr, new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO));
+                }
+                if (mutations.size() > maxSize) {
+                    throw new IllegalArgumentException("MutationState size of 
" + mutations.size() + " is bigger than max allowed size of " + maxSize);
+                }
+                rowCount++;
+                // Commit a batch if auto commit is true and we're at our 
batch size
+                if (isAutoCommit && rowCount % batchSize == 0) {
+                    MutationState state = new MutationState(targetTableRef, 
mutations, 0, maxSize, connection);
+                    connection.getMutationState().join(state);
+                    if (indexTableRef != null) {
+                        MutationState indexState = new 
MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
+                        connection.getMutationState().join(indexState);
+                    }
+                    connection.getMutationState().send();
+                    mutations.clear();
+                    if (indexMutations != null) {
+                        indexMutations.clear();
+                    }
+                }
+            }
+
+            // If auto commit is true, this last batch will be committed upon 
return
+            int nCommittedRows = rowCount / batchSize * batchSize;
+            MutationState state = new MutationState(targetTableRef, mutations, 
nCommittedRows, maxSize, connection);
+            if (indexTableRef != null) {
+                // To prevent the counting of these index rows, we have a 
negative for remainingRows.
+                MutationState indexState = new MutationState(indexTableRef, 
indexMutations, 0, maxSize, connection);
+                state.join(indexState);
+            }
+            return state;
+        }
+    }
 }

Reply via email to