Repository: phoenix Updated Branches: refs/heads/calcite 7cfe49114 -> f344e3355
PHOENIX-2197 Support DML in Phoenix/Calcite integration (part 3: implement DELETE) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f344e335 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f344e335 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f344e335 Branch: refs/heads/calcite Commit: f344e3355ea1257144edde8306a1fe76675f5ab6 Parents: 7cfe491 Author: maryannxue <[email protected]> Authored: Thu Jun 9 10:51:27 2016 -0400 Committer: maryannxue <[email protected]> Committed: Thu Jun 9 10:51:27 2016 -0400 ---------------------------------------------------------------------- .../apache/phoenix/calcite/CalciteDMLIT.java | 16 +- .../phoenix/calcite/rel/PhoenixTableModify.java | 156 ++++++++++++++++++- 2 files changed, 170 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f344e335/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java index fc58968..4c1bd0d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java @@ -101,12 +101,26 @@ public class CalciteDMLIT extends BaseCalciteIT { .close(); } + @Ignore // CALCITE-1278 @Test public void testDelete() throws Exception { + start(PROPS).sql("upsert into atable(organization_id, entity_id) values('1', '1')") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableModify(table=[[phoenix, ATABLE]], operation=[INSERT], updateColumnList=[[]], flattened=[false])\n" + + " PhoenixClientProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[null], B_STRING=[null], A_INTEGER=[null], A_DATE=[null], A_TIME=[null], A_TIMESTAMP=[null], X_DECIMAL=[null], X_LONG=[null], X_INTEGER=[null], Y_INTEGER=[null], A_BYTE=[null], A_SHORT=[null], A_FLOAT=[null], A_DOUBLE=[null], A_UNSIGNED_FLOAT=[null], A_UNSIGNED_DOUBLE=[null])\n" + + " PhoenixValues(tuples=[[{ '1 ', '1 ' }]])\n") + .executeUpdate() + .close(); + start(PROPS).sql("select organization_id, entity_id from aTable where organization_id = '1'") + .resultIs(0, new Object[][] {{"1 ", "1 "}}) + .close(); start(PROPS).sql("delete from atable where organization_id = '1' and entity_id = '1'") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixTableModify(table=[[phoenix, ATABLE]], operation=[DELETE], updateColumnList=[[]], flattened=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[AND(=($0, CAST('1'):CHAR(15) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL), =($1, CAST('1'):CHAR(15) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL))])\n") - //.executeUpdate() + .executeUpdate() + .close(); + start(PROPS).sql("select * from aTable where organization_id = '1'") + .resultIs(new Object[][] {}) .close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f344e335/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java index 196f839..1b83be2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java @@ -1,8 +1,11 @@ package org.apache.phoenix.calcite.rel; +import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO; + import java.sql.ParameterMetaData; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,11 +43,15 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.Lists; @@ -85,7 +92,7 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel { } // delete - throw new UnsupportedOperationException(); + return delete(connection, targetTable, targetTableRef, queryPlan, projector); } private static MutationPlan upsert(final PhoenixConnection connection, @@ -238,4 +245,151 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel { mutation.put(ptr, new RowMutationState(columnValues, connection.getStatementExecutionCounter(), rowTsColInfo)); } + private static MutationPlan delete(final PhoenixConnection connection, + final PhoenixTable targetTable, final TableRef targetTableRef, + final QueryPlan queryPlan, final RowProjector projector) { + final StatementContext context = queryPlan.getContext(); + // TODO + final boolean deleteFromImmutableIndexToo = false; + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTargetRef() { + return targetTableRef; + } + + @Override + public Set<TableRef> getSourceRefs() { + // TODO dataPlan.getSourceRefs(); + return queryPlan.getSourceRefs(); + } + + @Override + public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() { + return org.apache.phoenix.jdbc.PhoenixStatement.Operation.DELETE; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = queryPlan.iterator(); + try { + // TODO hasLimit?? + return deleteRows(context, targetTableRef, deleteFromImmutableIndexToo ? queryPlan.getTableRef() : null, iterator, projector, queryPlan.getTableRef()); + } finally { + iterator.close(); + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("DELETE ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }; + } + + private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { + PTable table = targetTableRef.getTable(); + PhoenixStatement statement = childContext.getStatement(); + PhoenixConnection connection = statement.getConnection(); + PName tenantId = connection.getTenantId(); + byte[] tenantIdBytes = null; + if (tenantId != null) { + tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId); + } + final boolean isAutoCommit = connection.getAutoCommit(); + ConnectionQueryServices services = connection.getQueryServices(); + final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,RowMutationState> indexMutations = null; + // If indexTableRef is set, we're deleting the rows from both the index table and + // the data table through a single query to save executing an additional one. + if (indexTableRef != null) { + indexMutations = Maps.newHashMapWithExpectedSize(batchSize); + } + List<PColumn> pkColumns = table.getPKColumns(); + boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; + boolean isSharedViewIndex = table.getViewIndexId() != null; + int offset = (table.getBucketNum() == null ? 0 : 1); + byte[][] values = new byte[pkColumns.size()][]; + if (isMultiTenant) { + values[offset++] = tenantIdBytes; + } + if (isSharedViewIndex) { + values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); + } + try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { + int rowCount = 0; + while (rs.next()) { + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + // Use tuple directly, as projector would not have all the PK columns from + // our index table inside of our projection. Since the tables are equal, + // there's no transation required. + if (sourceTableRef.equals(targetTableRef)) { + rs.getCurrentRow().getKey(ptr); + } else { + for (int i = offset; i < values.length; i++) { + byte[] byteValue = rs.getBytes(i+1-offset); + // The ResultSet.getBytes() call will have inverted it - we need to invert it back. + // TODO: consider going under the hood and just getting the bytes + if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) { + byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length); + byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length); + } + values[i] = byteValue; + } + table.newKey(ptr, values); + } + // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the + // row key will already have its value. + mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); + if (indexTableRef != null) { + ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + rs.getCurrentRow().getKey(indexPtr); + indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); + } + if (mutations.size() > maxSize) { + throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); + } + rowCount++; + // Commit a batch if auto commit is true and we're at our batch size + if (isAutoCommit && rowCount % batchSize == 0) { + MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection); + connection.getMutationState().join(state); + if (indexTableRef != null) { + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + connection.getMutationState().join(indexState); + } + connection.getMutationState().send(); + mutations.clear(); + if (indexMutations != null) { + indexMutations.clear(); + } + } + } + + // If auto commit is true, this last batch will be committed upon return + int nCommittedRows = rowCount / batchSize * batchSize; + MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection); + if (indexTableRef != null) { + // To prevent the counting of these index rows, we have a negative for remainingRows. + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + state.join(indexState); + } + return state; + } + } }
