This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new bf764c20a9 PHOENIX-7434 Extend atomic support to single row delete
with condition on non-pk columns (#2008)
bf764c20a9 is described below
commit bf764c20a9f5ba4ff54d4155bbedad3ebdef37f0
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Dec 13 13:14:38 2024 -0800
PHOENIX-7434 Extend atomic support to single row delete with condition on
non-pk columns (#2008)
---
.../org/apache/phoenix/compile/DeleteCompiler.java | 67 +++++-
.../BaseScannerRegionObserverConstants.java | 1 +
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 60 ++++-
.../UngroupedAggregateRegionObserver.java | 42 +++-
.../UngroupedAggregateRegionScanner.java | 81 ++++++-
.../apache/phoenix/end2end/OnDuplicateKey2IT.java | 258 ++++++++++++++-------
6 files changed, 399 insertions(+), 110 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 476d40dc26..f92b34d1c7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -29,7 +29,10 @@ import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -87,6 +90,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.util.ByteUtil;
@@ -468,6 +472,11 @@ public class DeleteCompiler {
}
public MutationPlan compile(DeleteStatement delete) throws SQLException {
+ return compile(delete, null);
+ }
+
+ public MutationPlan compile(DeleteStatement delete,
MutationState.ReturnResult returnResult)
+ throws SQLException {
final PhoenixConnection connection = statement.getConnection();
final boolean isAutoCommit = connection.getAutoCommit();
final boolean hasPostProcessing = delete.getLimit() != null;
@@ -612,6 +621,11 @@ public class DeleteCompiler {
final StatementContext context = dataPlan.getContext();
Scan scan = context.getScan();
scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_AGG,
QueryConstants.TRUE);
+ if (context.getScanRanges().getPointLookupCount() == 1 &&
+ returnResult == MutationState.ReturnResult.ROW) {
+
scan.setAttribute(BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE,
+ QueryConstants.TRUE);
+ }
// Build an ungrouped aggregate query: select COUNT(*) from
<table> where <where>
// The coprocessor will delete each row returned from the scan
@@ -833,14 +847,24 @@ public class DeleteCompiler {
}
ResultIterator iterator = aggPlan.iterator();
try {
- Tuple row = iterator.next();
- final long mutationCount = (Long)
projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
- return new MutationState(maxSize, maxSizeBytes,
connection) {
- @Override
- public long getUpdateCount() {
- return mutationCount;
- }
- };
+ byte[] singleRowDelete =
+ context.getScan().getAttribute(
+
BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE);
+ boolean isSingleRowDelete = singleRowDelete != null &&
+ Bytes.compareTo(PDataType.TRUE_BYTES,
singleRowDelete) == 0;
+ if (isSingleRowDelete) {
+ return deleteRowAndGetMutationState(table);
+ } else {
+ Tuple row = iterator.next();
+ final long mutationCount = (Long)
projector.getColumnProjector(0)
+ .getValue(row, PLong.INSTANCE, ptr);
+ return new MutationState(maxSize, maxSizeBytes,
connection) {
+ @Override
+ public long getUpdateCount() {
+ return mutationCount;
+ }
+ };
+ }
} finally {
iterator.close();
}
@@ -851,6 +875,33 @@ public class DeleteCompiler {
}
}
+ /**
+ * Initiate server side single row delete operation atomically and
return the Result only
+ * if the row is deleted.
+ *
+ * @param table PTable object.
+ * @return Mutation state.
+ * @throws SQLException If something goes wrong with server side
operation.
+ */
+ private MutationState deleteRowAndGetMutationState(PTable table)
throws SQLException {
+ Table hTable =
+ connection.getQueryServices()
+ .getTable(table.getTableName().getBytes());
+ try (ResultScanner scanner = hTable.getScanner(
+ new Scan(context.getScan()))) {
+ Result res = scanner.next();
+ Result result = res != null ? res : Result.EMPTY_RESULT;
+ return new MutationState(maxSize, maxSizeBytes, connection) {
+ @Override
+ public Result getResult() {
+ return result;
+ }
+ };
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ }
+
@Override
public ExplainPlan getExplainPlan() throws SQLException {
ExplainPlan explainPlan = aggPlan.getExplainPlan();
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index 212592dbe2..df23020744 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -58,6 +58,7 @@ public class BaseScannerRegionObserverConstants {
public static final String TOPN = "_TopN";
public static final String UNGROUPED_AGG = "_UngroupedAgg";
public static final String DELETE_AGG = "_DeleteAgg";
+ public static final String SINGLE_ROW_DELETE = "_SingleRowDelete";
public static final String UPSERT_SELECT_TABLE = "_UpsertSelectTable";
public static final String UPSERT_SELECT_EXPRS = "_UpsertSelectExprs";
public static final String DELETE_CQ = "_DeleteCQ";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index c361868405..fd778dfc92 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -623,9 +623,19 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
throw new UpgradeRequiredException();
}
state = connection.getMutationState();
- plan = stmt.compilePlan(PhoenixStatement.this,
Sequence.ValueOp.VALIDATE_SEQUENCE);
isUpsert = stmt instanceof
ExecutableUpsertStatement;
isDelete = stmt instanceof
ExecutableDeleteStatement;
+ if (isDelete && connection.getAutoCommit() &&
+ returnResult == ReturnResult.ROW) {
+ // used only if single row deletion needs
to atomically
+ // return row that is deleted.
+ plan = ((ExecutableDeleteStatement)
stmt).compilePlan(
+ PhoenixStatement.this,
+
Sequence.ValueOp.VALIDATE_SEQUENCE, returnResult);
+ } else {
+ plan =
stmt.compilePlan(PhoenixStatement.this,
+
Sequence.ValueOp.VALIDATE_SEQUENCE);
+ }
isAtomicUpsert = isUpsert &&
((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
if (plan.getTargetRef() != null &&
plan.getTargetRef().getTable() != null) {
if
(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString()))
{
@@ -647,7 +657,9 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
lastState.getUpdateCount());
Result result = null;
if (connection.getAutoCommit()) {
- if (isSingleRowUpdatePlan(isUpsert,
isDelete, plan)) {
+ boolean singleRowUpdate =
isSingleRowUpdatePlan(isUpsert,
+ isDelete, plan);
+ if (singleRowUpdate) {
state.setReturnResult(returnResult);
}
connection.commit();
@@ -657,6 +669,8 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
}
result =
connection.getMutationState().getResult();
connection.getMutationState().clearResult();
+ result = getResult(singleRowUpdate,
isDelete, plan,
+ lastState, result);
}
setLastQueryPlan(null);
setLastUpdateCount(lastUpdateCount);
@@ -758,6 +772,25 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
}
}
+ /**
+ * Get different Result if the row is atomically deleted.
+ *
+ * @param singleRowUpdate True if this is single row Upsert/Delete.
+ * @param isDelete True if this is Delete and not Upsert.
+ * @param plan Mutation Plan.
+ * @param mutationState Mutation State.
+ * @param result Result obtained so far.
+ * @return Result for the atomically updated row.
+ */
+ private Result getResult(boolean singleRowUpdate, boolean isDelete,
MutationPlan plan,
+ MutationState mutationState, Result result) {
+ if (singleRowUpdate && isDelete &&
+ plan instanceof DeleteCompiler.ServerSelectDeleteMutationPlan)
{
+ result = mutationState.getResult();
+ }
+ return result;
+ }
+
private static boolean isSingleRowUpdatePlan(boolean isUpsert, boolean
isDelete,
MutationPlan plan) {
boolean isSingleRowUpdate = false;
@@ -1161,6 +1194,29 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
plan.getContext().getSequenceManager().validateSequences(seqAction);
return plan;
}
+
+ /**
+ * Compile Plan for single row delete with additional condition on
non-pk columns. New
+ * plan compilation is used for returning the deleted row atomically
only if it is deleted
+ * by the given DELETE statement.
+ *
+ * @param stmt JDBC Phoenix Statement object.
+ * @param seqAction Sequence statement validation.
+ * @param returnResult ReturnResult object.
+ * @return The compiled MutationPlan.
+ * @throws SQLException If something fails during plan compilation.
+ */
+ public MutationPlan compilePlan(PhoenixStatement stmt,
+ Sequence.ValueOp seqAction,
+ ReturnResult returnResult) throws
SQLException {
+ if (!getUdfParseNodes().isEmpty()) {
+ stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
+ }
+ DeleteCompiler compiler = new DeleteCompiler(stmt,
this.getOperation());
+ MutationPlan plan = compiler.compile(this, returnResult);
+
plan.getContext().getSequenceManager().validateSequences(seqAction);
+ return plan;
+ }
}
private static class ExecutableCreateTableStatement extends
CreateTableStatement implements CompilableStatement {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c19e1959e2..74c965345a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -32,11 +32,9 @@ import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.GuardedBy;
@@ -50,6 +48,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -61,7 +60,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -94,7 +92,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -124,7 +121,6 @@ import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
-import org.apache.phoenix.util.MetaDataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -299,15 +295,16 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
}
}
- private void commitBatchWithTable(Table table, List<Mutation> mutations)
throws IOException {
+ private Object[] commitBatchWithTable(Table table, List<Mutation>
mutations)
+ throws IOException {
if (mutations.isEmpty()) {
- return;
+ return null;
}
-
LOGGER.debug("Committing batch of " + mutations.size() + " mutations
for " + table);
try {
Object[] results = new Object[mutations.size()];
table.batch(mutations, results);
+ return results;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
@@ -524,6 +521,35 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
remoteRegionMutations.clear();
}
+ /**
+ * Commit single row delete Mutation atomically and return result only if
the row is
+ * deleted.
+ *
+ * @param mutations List of Mutations.
+ * @param indexUUID IndexUUID.
+ * @param indexMaintainersPtr Serialized Index Maintainer.
+ * @param txState Transaction State.
+ * @param targetHTable HTable on which the Mutation is executed.
+ * @param useIndexProto True if IdxProtoMD is to be used.
+ * @param clientVersionBytes Client version.
+ * @return Result for the atomic single row deletion.
+ * @throws IOException If something goes wrong with the execution.
+ */
+ Result commitWithResultReturned(List<Mutation> mutations,
+ byte[] indexUUID,
+ byte[] indexMaintainersPtr, byte[] txState,
+ final Table targetHTable, boolean
useIndexProto,
+ byte[] clientVersionBytes)
+ throws IOException {
+ setIndexAndTransactionProperties(mutations, indexUUID,
indexMaintainersPtr, txState,
+ clientVersionBytes, useIndexProto);
+ Object[] results = commitBatchWithTable(targetHTable, mutations);
+ if (results != null && results.length == 1) {
+ return (Result) results[0];
+ }
+ return Result.EMPTY_RESULT;
+ }
+
private void handleIndexWriteException(final List<Mutation>
localRegionMutations, IOException origIOE,
MutateCommand mutateCommand) throws
IOException {
long serverTimestamp =
ClientUtil.parseTimestampFromRemoteException(origIOE);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index dc574c015e..32a837c080 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -80,6 +81,7 @@ import
org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.memory.InsufficientMemoryException;
import org.apache.phoenix.memory.MemoryManager;
@@ -165,6 +167,12 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
private byte[] indexMaintainersPtr;
private boolean useIndexProto;
+ /**
+ * Single row atomic delete that requires returning result (row) back to
client
+ * only if the row is successfully deleted by the given thread.
+ */
+ private boolean isSingleRowDelete = false;
+
public UngroupedAggregateRegionScanner(final
ObserverContext<RegionCoprocessorEnvironment> c,
final RegionScanner innerScanner,
final Region region, final Scan scan,
final RegionCoprocessorEnvironment
env,
@@ -240,6 +248,17 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
} else {
byte[] isDeleteAgg =
scan.getAttribute(BaseScannerRegionObserverConstants.DELETE_AGG);
isDelete = isDeleteAgg != null &&
Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+ byte[] singleRowDelete =
+
scan.getAttribute(BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE);
+ isSingleRowDelete = singleRowDelete != null &&
+ Bytes.compareTo(PDataType.TRUE_BYTES, singleRowDelete) ==
0;
+ if (isSingleRowDelete) {
+ //The Connection is a singleton. It MUST NOT be closed.
+ targetHTable = ServerUtil.ConnectionFactory.getConnection(
+
ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION,
+ env)
+ .getTable(region.getRegionInfo().getTable());
+ }
if (!isDelete) {
deleteCF =
scan.getAttribute(BaseScannerRegionObserverConstants.DELETE_CF);
deleteCQ =
scan.getAttribute(BaseScannerRegionObserverConstants.DELETE_CQ);
@@ -450,6 +469,7 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
}
result.setKeyValues(results);
}
+
void deleteRow(List<Cell> results,
UngroupedAggregateRegionObserver.MutationList mutations) {
Cell firstKV = results.get(0);
Delete delete = new Delete(firstKV.getRowArray(),
@@ -462,7 +482,10 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
if (sourceOperationBytes != null) {
delete.setAttribute(SOURCE_OPERATION_ATTRIB, sourceOperationBytes);
}
-
+ if (isSingleRowDelete) {
+ delete.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+ PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
+ }
mutations.add(delete);
}
@@ -593,6 +616,7 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
|| (deleteCQ != null && deleteCF != null) || emptyCF !=
null || buildLocalIndex) {
mutations = new
UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize +
maxBatchSize / 10));
}
+ Result atomicSingleRowDeleteResult = null;
region.startRegionOperation();
try {
synchronized (innerScanner) {
@@ -640,7 +664,12 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
insertEmptyKeyValue(results, mutations);
}
if (ServerUtil.readyToCommit(mutations.size(),
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- annotateAndCommit(mutations);
+ if (!isSingleRowDelete) {
+ annotateAndCommit(mutations);
+ } else {
+ atomicSingleRowDeleteResult =
+
annotateCommitAndReturnResult(mutations);
+ }
}
// Commit in batches based on
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
@@ -654,7 +683,11 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
}
} while (hasMore &&
(EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs);
if (!mutations.isEmpty()) {
- annotateAndCommit(mutations);
+ if (!isSingleRowDelete) {
+ annotateAndCommit(mutations);
+ } else {
+ atomicSingleRowDeleteResult =
annotateCommitAndReturnResult(mutations);
+ }
}
if (!indexMutations.isEmpty()) {
ungroupedAggregateRegionObserver.commitBatch(region,
indexMutations, blockingMemStoreSize);
@@ -674,7 +707,13 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
}
Cell keyValue;
if (hasAny) {
- byte[] value = aggregators.toBytes(rowAggregators);
+ final byte[] value;
+ if (isSingleRowDelete && atomicSingleRowDeleteResult != null) {
+
resultsToReturn.addAll(atomicSingleRowDeleteResult.listCells());
+ return hasMore;
+ } else {
+ value = aggregators.toBytes(rowAggregators);
+ }
if (pageSizeMs == Long.MAX_VALUE) {
byte[] rowKey;
final boolean isIncompatibleClient =
@@ -699,13 +738,41 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
}
private void
annotateAndCommit(UngroupedAggregateRegionObserver.MutationList mutations)
throws IOException {
+ annotateMutations(mutations);
+ ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID,
blockingMemStoreSize, indexMaintainersPtr, txState,
+ targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
+ mutations.clear();
+ }
+
+ /**
+ * Similar to {@link
#annotateAndCommit(UngroupedAggregateRegionObserver.MutationList)} but
+ * only meant for single row atomic delete mutation that requires
returning the result if
+ * the row is deleted atomically.
+ *
+ * @param mutations Mutation list.
+ * @return Result to be returned.
+ * @throws IOException If something goes wrong with the operation.
+ */
+ private Result annotateCommitAndReturnResult(
+ UngroupedAggregateRegionObserver.MutationList mutations) throws
IOException {
+ annotateMutations(mutations);
+ Result result =
ungroupedAggregateRegionObserver.commitWithResultReturned(mutations,
+ indexUUID, indexMaintainersPtr, txState, targetHTable,
+ useIndexProto, clientVersionBytes);
+ mutations.clear();
+ return result;
+ }
+
+ /**
+ * Annotate the give mutations as per the scan attributes.
+ *
+ * @param mutations The mutations that need to be annotated.
+ */
+ private void
annotateMutations(UngroupedAggregateRegionObserver.MutationList mutations) {
annotateDataMutations(mutations, scan);
if (isDelete || isUpsert) {
annotateDataMutationsWithExternalSchemaId(mutations, scan);
}
- ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID,
blockingMemStoreSize, indexMaintainersPtr, txState,
- targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
- mutations.clear();
}
@Override
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
index 064659ac26..22c020e434 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -141,7 +143,7 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
}
@Test
- public void testReturnRowResult() throws Exception {
+ public void testReturnRowResult1() throws Exception {
Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
"2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
@@ -160,45 +162,7 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
conn.createStatement().execute(ddl);
createIndex(conn, tableName);
- String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3,
COUNTER1, COL3, COL4)"
- + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON
DUPLICATE KEY " +
- "IGNORE";
- validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
1011.202, null, true,
- bsonDocument1, bsonDocument1, 123);
-
- upsertSql =
- "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
- + "VALUES('pk000', -123.98, 'pk003', 0) ON
DUPLICATE"
- + " KEY IGNORE";
- validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
1011.202, null, false,
- null, bsonDocument1, 123);
-
- upsertSql =
- "UPSERT INTO " + tableName
- + " (PK1, PK2, PK3, COUNTER1, COUNTER2)
VALUES('pk000', -123.98, "
- + "'pk003', 234, 'col2_000')";
- validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 234d,
"col2_000", true,
- null, bsonDocument1, 123);
-
- upsertSql = "UPSERT INTO " + tableName
- + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
- + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
- + " END, "
- + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN
'col2_001' ELSE COUNTER2 "
- + "END, "
- + "COL3 = ?, "
- + "COL4 = 234";
- validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
2233.99, "col2_001", true,
- bsonDocument2, bsonDocument2, 234);
-
- upsertSql = "UPSERT INTO " + tableName
- + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
- + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
- + " END,"
- + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN
'col2_001' ELSE COUNTER2 "
- + "END";
- validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
2233.99, "col2_001", false
- , null, bsonDocument2, 234);
+ validateAtomicUpsertReturnRow(tableName, conn, bsonDocument1,
bsonDocument2);
PhoenixPreparedStatement ps =
conn.prepareStatement(
@@ -212,37 +176,162 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", true, false,
bsonDocument2, 234);
- addRows(tableName, conn);
+ validateMultiRowDelete(tableName, conn, bsonDocument2);
+ }
+ }
+
+ @Test
+ public void testReturnRowResult2() throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String sample1 = getJsonString("json/sample_01.json");
+ String sample2 = getJsonString("json/sample_02.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName
+ + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR,
COUNTER1 DOUBLE,"
+ + " COUNTER2 VARCHAR,"
+ + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY
KEY(PK1, PK2, PK3))";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ validateAtomicUpsertReturnRow(tableName, conn, bsonDocument1,
bsonDocument2);
+
+ verifyIndexRow(conn, tableName, false);
- ps = conn.prepareStatement(
- "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?")
+ PhoenixPreparedStatement ps =
+ conn.prepareStatement(
+ "DELETE FROM " + tableName + " WHERE PK1 =
? AND PK2 " +
+ "= ? AND PK3 = ? AND COL4 = ?")
+ .unwrap(PhoenixPreparedStatement.class);
+ ps.setString(1, "pk000");
+ ps.setDouble(2, -123.98);
+ ps.setString(3, "pk003");
+ ps.setInt(4, 235);
+ validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", true, false,
+ bsonDocument2, 234);
+
+ ps = conn.prepareStatement("DELETE FROM " + tableName
+ + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4
= ?")
.unwrap(PhoenixPreparedStatement.class);
- ps.setString(1, "pk001");
- ps.setDouble(2, 122.34);
- validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", false, false,
+ ps.setString(1, "pk000");
+ ps.setDouble(2, -123.98);
+ ps.setString(3, "pk003");
+ ps.setInt(4, 234);
+ validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", true, true,
bsonDocument2, 234);
- ps = conn.prepareStatement(
- "DELETE FROM " +
tableName).unwrap(PhoenixPreparedStatement.class);
- validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", false, false,
+ verifyIndexRow(conn, tableName, true);
+ validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", true, false,
bsonDocument2, 234);
- ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM
" + tableName);
- assertFalse(rs.next());
+ validateMultiRowDelete(tableName, conn, bsonDocument2);
+ }
+ }
- addRows(tableName, conn);
+ private void verifyIndexRow(Connection conn, String tableName, boolean
deleted) throws SQLException {
+ PreparedStatement preparedStatement =
+ conn.prepareStatement("SELECT COUNTER2 FROM " + tableName + "
WHERE COUNTER1 " +
+ "= ?");
+ preparedStatement.setDouble(1, 2233.99);
- ps = conn.prepareStatement(
- "DELETE FROM " + tableName + " WHERE PK1 IN (?)
AND PK2 IN (?) AND " +
- "PK3 IN (?, ?)")
- .unwrap(PhoenixPreparedStatement.class);
- ps.setString(1, "pk001");
- ps.setDouble(2, 122.34);
- ps.setString(3, "pk004");
- ps.setString(4, "pk005");
- validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", false, false,
- bsonDocument2, 234);
+ ResultSet resultSet = preparedStatement.executeQuery();
+ if (!deleted) {
+ assertTrue(resultSet.next());
+ assertEquals("col2_001", resultSet.getString(1));
}
+ assertFalse(resultSet.next());
+
+ ExplainPlan plan =
+
preparedStatement.unwrap(PhoenixPreparedStatement.class).optimizeQuery()
+ .getExplainPlan();
+ ExplainPlanAttributes explainPlanAttributes =
plan.getPlanStepsAsAttributes();
+ assertEquals(indexDDL.contains("index") ? (indexDDL.contains("local
index") ?
+ tableName + "_IDX(" + tableName + ")" : tableName +
"_IDX") :
+ tableName, explainPlanAttributes.getTableName());
+ }
+
+ private static void validateMultiRowDelete(String tableName, Connection
conn,
+ BsonDocument bsonDocument2)
+ throws SQLException {
+ addRows(tableName, conn);
+
+ PhoenixPreparedStatement ps = conn.prepareStatement(
+ "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 =
?")
+ .unwrap(PhoenixPreparedStatement.class);
+ ps.setString(1, "pk001");
+ ps.setDouble(2, 122.34);
+ validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", false, false,
+ bsonDocument2, 234);
+
+ ps = conn.prepareStatement(
+ "DELETE FROM " +
tableName).unwrap(PhoenixPreparedStatement.class);
+ validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", false, false,
+ bsonDocument2, 234);
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " +
tableName);
+ assertFalse(rs.next());
+
+ addRows(tableName, conn);
+
+ ps = conn.prepareStatement(
+ "DELETE FROM " + tableName + " WHERE PK1 IN (?) AND
PK2 IN (?) AND " +
+ "PK3 IN (?, ?)")
+ .unwrap(PhoenixPreparedStatement.class);
+ ps.setString(1, "pk001");
+ ps.setDouble(2, 122.34);
+ ps.setString(3, "pk004");
+ ps.setString(4, "pk005");
+ validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99,
"col2_001", false, false,
+ bsonDocument2, 234);
+ }
+
+ private static void validateAtomicUpsertReturnRow(String tableName,
Connection conn, BsonDocument bsonDocument1,
+ BsonDocument bsonDocument2) throws
SQLException {
+ String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3,
COUNTER1, COL3, COL4)"
+ + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON
DUPLICATE KEY " +
+ "IGNORE";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202,
null, true,
+ bsonDocument1, bsonDocument1, 123);
+
+ upsertSql =
+ "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
+ + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE"
+ + " KEY IGNORE";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202,
null, false,
+ null, bsonDocument1, 123);
+
+ upsertSql =
+ "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3, COUNTER1, COUNTER2)
VALUES('pk000', -123.98, "
+ + "'pk003', 234, 'col2_000')";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 234d,
"col2_000", true,
+ null, bsonDocument1, 123);
+
+ upsertSql = "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
+ + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
+ + " END, "
+ + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001'
ELSE COUNTER2 "
+ + "END, "
+ + "COL3 = ?, "
+ + "COL4 = 234";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99,
"col2_001", true,
+ bsonDocument2, bsonDocument2, 234);
+
+ upsertSql = "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
+ + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
+ + " END,"
+ + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001'
ELSE COUNTER2 "
+ + "END";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99,
"col2_001", false
+ , null, bsonDocument2, 234);
}
private static void addRows(String tableName, Connection conn) throws
SQLException {
@@ -292,31 +381,13 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
return;
}
- validateReturnedRowResult(col1, col2, expectedDoc, col4, table, cell,
result);
+ validateReturnedRowResult(col2, expectedDoc, col4, table, result);
}
- private static void validateReturnedRowResult(Double col1, String col2,
+ private static void validateReturnedRowResult(String col2,
BsonDocument expectedDoc,
Integer col4,
- PTable table, Cell cell,
ResultTuple result) {
- ImmutableBytesPtr ptr = new ImmutableBytesPtr();
- table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 1);
- assertEquals("pk000",
- PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
-
- ptr = new ImmutableBytesPtr();
- table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 2);
- assertEquals(-123.98,
- PDouble.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
-
- ptr = new ImmutableBytesPtr();
- table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 3);
- assertEquals("pk003",
- PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
-
- assertEquals(col1,
- PDouble.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(),
- cell.getValueLength()));
- cell = result.getResult()
+ PTable table, ResultTuple
result) {
+ Cell cell = result.getResult()
.getColumnLatestCell(table.getColumns().get(4).getFamilyName().getBytes(),
table.getColumns().get(4).getColumnQualifierBytes());
if (col2 != null) {
@@ -378,7 +449,24 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
result.getResult()
.getColumnLatestCell(table.getColumns().get(3).getFamilyName().getBytes(),
table.getColumns().get(3).getColumnQualifierBytes());
- validateReturnedRowResult(col1, col2, expectedDoc, col4, table, cell,
result);
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 1);
+ assertEquals("pk000",
+ PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
+
+ ptr = new ImmutableBytesPtr();
+ table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 2);
+ assertEquals(-123.98,
+ PDouble.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
+
+ ptr = new ImmutableBytesPtr();
+ table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 3);
+ assertEquals("pk003",
+ PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
+ assertEquals(col1,
+ PDouble.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(),
+ cell.getValueLength()));
+ validateReturnedRowResult(col2, expectedDoc, col4, table, result);
}
@Test