This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new bf764c20a9 PHOENIX-7434 Extend atomic support to single row delete 
with condition on non-pk columns (#2008)
bf764c20a9 is described below

commit bf764c20a9f5ba4ff54d4155bbedad3ebdef37f0
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Dec 13 13:14:38 2024 -0800

    PHOENIX-7434 Extend atomic support to single row delete with condition on 
non-pk columns (#2008)
---
 .../org/apache/phoenix/compile/DeleteCompiler.java |  67 +++++-
 .../BaseScannerRegionObserverConstants.java        |   1 +
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  60 ++++-
 .../UngroupedAggregateRegionObserver.java          |  42 +++-
 .../UngroupedAggregateRegionScanner.java           |  81 ++++++-
 .../apache/phoenix/end2end/OnDuplicateKey2IT.java  | 258 ++++++++++++++-------
 6 files changed, 399 insertions(+), 110 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 476d40dc26..f92b34d1c7 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -29,7 +29,10 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -87,6 +90,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.util.ByteUtil;
@@ -468,6 +472,11 @@ public class DeleteCompiler {
     }
 
     public MutationPlan compile(DeleteStatement delete) throws SQLException {
+        return compile(delete, null);
+    }
+
+    public MutationPlan compile(DeleteStatement delete, 
MutationState.ReturnResult returnResult)
+            throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
         final boolean isAutoCommit = connection.getAutoCommit();
         final boolean hasPostProcessing = delete.getLimit() != null;
@@ -612,6 +621,11 @@ public class DeleteCompiler {
             final StatementContext context = dataPlan.getContext();
             Scan scan = context.getScan();
             scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_AGG, 
QueryConstants.TRUE);
+            if (context.getScanRanges().getPointLookupCount() == 1 &&
+                    returnResult == MutationState.ReturnResult.ROW) {
+                
scan.setAttribute(BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE,
+                        QueryConstants.TRUE);
+            }
 
             // Build an ungrouped aggregate query: select COUNT(*) from 
<table> where <where>
             // The coprocessor will delete each row returned from the scan
@@ -833,14 +847,24 @@ public class DeleteCompiler {
                 }
                 ResultIterator iterator = aggPlan.iterator();
                 try {
-                    Tuple row = iterator.next();
-                    final long mutationCount = (Long) 
projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                    return new MutationState(maxSize, maxSizeBytes, 
connection) {
-                        @Override
-                        public long getUpdateCount() {
-                            return mutationCount;
-                        }
-                    };
+                    byte[] singleRowDelete =
+                            context.getScan().getAttribute(
+                                    
BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE);
+                    boolean isSingleRowDelete = singleRowDelete != null &&
+                            Bytes.compareTo(PDataType.TRUE_BYTES, 
singleRowDelete) == 0;
+                    if (isSingleRowDelete) {
+                        return deleteRowAndGetMutationState(table);
+                    } else {
+                        Tuple row = iterator.next();
+                        final long mutationCount = (Long) 
projector.getColumnProjector(0)
+                                .getValue(row, PLong.INSTANCE, ptr);
+                        return new MutationState(maxSize, maxSizeBytes, 
connection) {
+                            @Override
+                            public long getUpdateCount() {
+                                return mutationCount;
+                            }
+                        };
+                    }
                 } finally {
                     iterator.close();
                 }
@@ -851,6 +875,33 @@ public class DeleteCompiler {
             }
         }
 
+        /**
+         * Initiate server side single row delete operation atomically and 
return the Result only
+         * if the row is deleted.
+         *
+         * @param table PTable object.
+         * @return Mutation state.
+         * @throws SQLException If something goes wrong with server side 
operation.
+         */
+        private MutationState deleteRowAndGetMutationState(PTable table) 
throws SQLException {
+            Table hTable =
+                    connection.getQueryServices()
+                            .getTable(table.getTableName().getBytes());
+            try (ResultScanner scanner = hTable.getScanner(
+                    new Scan(context.getScan()))) {
+                Result res = scanner.next();
+                Result result = res != null ? res : Result.EMPTY_RESULT;
+                return new MutationState(maxSize, maxSizeBytes, connection) {
+                    @Override
+                    public Result getResult() {
+                        return result;
+                    }
+                };
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+        }
+
         @Override
         public ExplainPlan getExplainPlan() throws SQLException {
             ExplainPlan explainPlan = aggPlan.getExplainPlan();
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index 212592dbe2..df23020744 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -58,6 +58,7 @@ public class BaseScannerRegionObserverConstants {
     public static final String TOPN = "_TopN";
     public static final String UNGROUPED_AGG = "_UngroupedAgg";
     public static final String DELETE_AGG = "_DeleteAgg";
+    public static final String SINGLE_ROW_DELETE = "_SingleRowDelete";
     public static final String UPSERT_SELECT_TABLE = "_UpsertSelectTable";
     public static final String UPSERT_SELECT_EXPRS = "_UpsertSelectExprs";
     public static final String DELETE_CQ = "_DeleteCQ";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index c361868405..fd778dfc92 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -623,9 +623,19 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                     throw new UpgradeRequiredException();
                                 }
                                 state = connection.getMutationState();
-                                plan = stmt.compilePlan(PhoenixStatement.this, 
Sequence.ValueOp.VALIDATE_SEQUENCE);
                                 isUpsert = stmt instanceof 
ExecutableUpsertStatement;
                                 isDelete = stmt instanceof 
ExecutableDeleteStatement;
+                                if (isDelete && connection.getAutoCommit() &&
+                                        returnResult == ReturnResult.ROW) {
+                                    // used only if single row deletion needs 
to atomically
+                                    // return row that is deleted.
+                                    plan = ((ExecutableDeleteStatement) 
stmt).compilePlan(
+                                            PhoenixStatement.this,
+                                            
Sequence.ValueOp.VALIDATE_SEQUENCE, returnResult);
+                                } else {
+                                    plan = 
stmt.compilePlan(PhoenixStatement.this,
+                                            
Sequence.ValueOp.VALIDATE_SEQUENCE);
+                                }
                                 isAtomicUpsert = isUpsert && 
((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
                                 if (plan.getTargetRef() != null && 
plan.getTargetRef().getTable() != null) {
                                     if 
(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString()))
 {
@@ -647,7 +657,9 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                         lastState.getUpdateCount());
                                 Result result = null;
                                 if (connection.getAutoCommit()) {
-                                    if (isSingleRowUpdatePlan(isUpsert, 
isDelete, plan)) {
+                                    boolean singleRowUpdate = 
isSingleRowUpdatePlan(isUpsert,
+                                            isDelete, plan);
+                                    if (singleRowUpdate) {
                                         state.setReturnResult(returnResult);
                                     }
                                     connection.commit();
@@ -657,6 +669,8 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                     }
                                     result = 
connection.getMutationState().getResult();
                                     
connection.getMutationState().clearResult();
+                                    result = getResult(singleRowUpdate, 
isDelete, plan,
+                                            lastState, result);
                                 }
                                 setLastQueryPlan(null);
                                 setLastUpdateCount(lastUpdateCount);
@@ -758,6 +772,25 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
         }
     }
 
+    /**
+     * Get different Result if the row is atomically deleted.
+     *
+     * @param singleRowUpdate True if this is single row Upsert/Delete.
+     * @param isDelete True if this is Delete and not Upsert.
+     * @param plan Mutation Plan.
+     * @param mutationState Mutation State.
+     * @param result Result obtained so far.
+     * @return Result for the atomically updated row.
+     */
+    private Result getResult(boolean singleRowUpdate, boolean isDelete, 
MutationPlan plan,
+                             MutationState mutationState, Result result) {
+        if (singleRowUpdate && isDelete &&
+                plan instanceof DeleteCompiler.ServerSelectDeleteMutationPlan) 
{
+                result = mutationState.getResult();
+        }
+        return result;
+    }
+
     private static boolean isSingleRowUpdatePlan(boolean isUpsert, boolean 
isDelete,
                                                  MutationPlan plan) {
         boolean isSingleRowUpdate = false;
@@ -1161,6 +1194,29 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
             
plan.getContext().getSequenceManager().validateSequences(seqAction);
             return plan;
         }
+
+        /**
+         * Compile Plan for single row delete with additional condition on 
non-pk columns. New
+         * plan compilation is used for returning the deleted row atomically 
only if it is deleted
+         * by the given DELETE statement.
+         *
+         * @param stmt JDBC Phoenix Statement object.
+         * @param seqAction Sequence statement validation.
+         * @param returnResult ReturnResult object.
+         * @return The compiled MutationPlan.
+         * @throws SQLException If something fails during plan compilation.
+         */
+        public MutationPlan compilePlan(PhoenixStatement stmt,
+                                        Sequence.ValueOp seqAction,
+                                        ReturnResult returnResult) throws 
SQLException {
+            if (!getUdfParseNodes().isEmpty()) {
+                stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
+            }
+            DeleteCompiler compiler = new DeleteCompiler(stmt, 
this.getOperation());
+            MutationPlan plan = compiler.compile(this, returnResult);
+            
plan.getContext().getSequenceManager().validateSequences(seqAction);
+            return plan;
+        }
     }
     
     private static class ExecutableCreateTableStatement extends 
CreateTableStatement implements CompilableStatement {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c19e1959e2..74c965345a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -32,11 +32,9 @@ import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -50,6 +48,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -61,7 +60,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -94,7 +92,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -124,7 +121,6 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -299,15 +295,16 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         }
     }
 
-    private void commitBatchWithTable(Table table, List<Mutation> mutations) 
throws IOException {
+    private Object[] commitBatchWithTable(Table table, List<Mutation> 
mutations)
+            throws IOException {
         if (mutations.isEmpty()) {
-            return;
+            return null;
         }
-
         LOGGER.debug("Committing batch of " + mutations.size() + " mutations 
for " + table);
         try {
             Object[] results = new Object[mutations.size()];
             table.batch(mutations, results);
+            return results;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RuntimeException(e);
@@ -524,6 +521,35 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         remoteRegionMutations.clear();
     }
 
+    /**
+     * Commit single row delete Mutation atomically and return result only if 
the row is
+     * deleted.
+     *
+     * @param mutations List of Mutations.
+     * @param indexUUID IndexUUID.
+     * @param indexMaintainersPtr Serialized Index Maintainer.
+     * @param txState Transaction State.
+     * @param targetHTable HTable on which the Mutation is executed.
+     * @param useIndexProto True if IdxProtoMD is to be used.
+     * @param clientVersionBytes Client version.
+     * @return Result for the atomic single row deletion.
+     * @throws IOException If something goes wrong with the execution.
+     */
+    Result commitWithResultReturned(List<Mutation> mutations,
+                                    byte[] indexUUID,
+                                    byte[] indexMaintainersPtr, byte[] txState,
+                                    final Table targetHTable, boolean 
useIndexProto,
+                                    byte[] clientVersionBytes)
+            throws IOException {
+        setIndexAndTransactionProperties(mutations, indexUUID, 
indexMaintainersPtr, txState,
+                clientVersionBytes, useIndexProto);
+        Object[] results = commitBatchWithTable(targetHTable, mutations);
+        if (results != null && results.length == 1) {
+            return (Result) results[0];
+        }
+        return Result.EMPTY_RESULT;
+    }
+
     private void handleIndexWriteException(final List<Mutation> 
localRegionMutations, IOException origIOE,
                                            MutateCommand mutateCommand) throws 
IOException {
         long serverTimestamp = 
ClientUtil.parseTimestampFromRemoteException(origIOE);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index dc574c015e..32a837c080 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -80,6 +81,7 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.memory.InsufficientMemoryException;
 import org.apache.phoenix.memory.MemoryManager;
@@ -165,6 +167,12 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
     private byte[] indexMaintainersPtr;
     private boolean useIndexProto;
 
+    /**
+     * Single row atomic delete that requires returning result (row) back to 
client
+     * only if the row is successfully deleted by the given thread.
+     */
+    private boolean isSingleRowDelete = false;
+
     public UngroupedAggregateRegionScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
                                            final RegionScanner innerScanner, 
final Region region, final Scan scan,
                                            final RegionCoprocessorEnvironment 
env,
@@ -240,6 +248,17 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
         } else {
             byte[] isDeleteAgg = 
scan.getAttribute(BaseScannerRegionObserverConstants.DELETE_AGG);
             isDelete = isDeleteAgg != null && 
Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+            byte[] singleRowDelete =
+                    
scan.getAttribute(BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE);
+            isSingleRowDelete = singleRowDelete != null &&
+                    Bytes.compareTo(PDataType.TRUE_BYTES, singleRowDelete) == 
0;
+            if (isSingleRowDelete) {
+                //The Connection is a singleton. It MUST NOT be closed.
+                targetHTable = ServerUtil.ConnectionFactory.getConnection(
+                                
ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION,
+                                env)
+                        .getTable(region.getRegionInfo().getTable());
+            }
             if (!isDelete) {
                 deleteCF = 
scan.getAttribute(BaseScannerRegionObserverConstants.DELETE_CF);
                 deleteCQ = 
scan.getAttribute(BaseScannerRegionObserverConstants.DELETE_CQ);
@@ -450,6 +469,7 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
         }
         result.setKeyValues(results);
     }
+
     void deleteRow(List<Cell> results, 
UngroupedAggregateRegionObserver.MutationList mutations) {
         Cell firstKV = results.get(0);
         Delete delete = new Delete(firstKV.getRowArray(),
@@ -462,7 +482,10 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
         if (sourceOperationBytes != null) {
             delete.setAttribute(SOURCE_OPERATION_ATTRIB, sourceOperationBytes);
         }
-
+        if (isSingleRowDelete) {
+            delete.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+                    PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
+        }
         mutations.add(delete);
     }
 
@@ -593,6 +616,7 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
                     || (deleteCQ != null && deleteCF != null) || emptyCF != 
null || buildLocalIndex) {
                 mutations = new 
UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize + 
maxBatchSize / 10));
             }
+            Result atomicSingleRowDeleteResult = null;
             region.startRegionOperation();
             try {
                 synchronized (innerScanner) {
@@ -640,7 +664,12 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
                                 insertEmptyKeyValue(results, mutations);
                             }
                             if (ServerUtil.readyToCommit(mutations.size(), 
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                                annotateAndCommit(mutations);
+                                if (!isSingleRowDelete) {
+                                    annotateAndCommit(mutations);
+                                } else {
+                                    atomicSingleRowDeleteResult =
+                                            
annotateCommitAndReturnResult(mutations);
+                                }
                             }
                             // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
@@ -654,7 +683,11 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
                         }
                     } while (hasMore && 
(EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs);
                     if (!mutations.isEmpty()) {
-                        annotateAndCommit(mutations);
+                        if (!isSingleRowDelete) {
+                            annotateAndCommit(mutations);
+                        } else {
+                            atomicSingleRowDeleteResult = 
annotateCommitAndReturnResult(mutations);
+                        }
                     }
                     if (!indexMutations.isEmpty()) {
                         ungroupedAggregateRegionObserver.commitBatch(region, 
indexMutations, blockingMemStoreSize);
@@ -674,7 +707,13 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
             }
             Cell keyValue;
             if (hasAny) {
-                byte[] value = aggregators.toBytes(rowAggregators);
+                final byte[] value;
+                if (isSingleRowDelete && atomicSingleRowDeleteResult != null) {
+                    
resultsToReturn.addAll(atomicSingleRowDeleteResult.listCells());
+                    return hasMore;
+                } else {
+                    value = aggregators.toBytes(rowAggregators);
+                }
                 if (pageSizeMs == Long.MAX_VALUE) {
                     byte[] rowKey;
                     final boolean isIncompatibleClient =
@@ -699,13 +738,41 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
     }
 
     private void 
annotateAndCommit(UngroupedAggregateRegionObserver.MutationList mutations) 
throws IOException {
+        annotateMutations(mutations);
+        ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr, txState,
+            targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
+        mutations.clear();
+    }
+
+    /**
+     * Similar to {@link 
#annotateAndCommit(UngroupedAggregateRegionObserver.MutationList)} but
+     * only meant for single row atomic delete mutation that requires 
returning the result if
+     * the row is deleted atomically.
+     *
+     * @param mutations Mutation list.
+     * @return Result to be returned.
+     * @throws IOException If something goes wrong with the operation.
+     */
+    private Result annotateCommitAndReturnResult(
+            UngroupedAggregateRegionObserver.MutationList mutations) throws 
IOException {
+        annotateMutations(mutations);
+        Result result = 
ungroupedAggregateRegionObserver.commitWithResultReturned(mutations,
+                indexUUID, indexMaintainersPtr, txState, targetHTable,
+                useIndexProto, clientVersionBytes);
+        mutations.clear();
+        return result;
+    }
+
+    /**
+     * Annotate the give mutations as per the scan attributes.
+     *
+     * @param mutations The mutations that need to be annotated.
+     */
+    private void 
annotateMutations(UngroupedAggregateRegionObserver.MutationList mutations) {
         annotateDataMutations(mutations, scan);
         if (isDelete || isUpsert) {
             annotateDataMutationsWithExternalSchemaId(mutations, scan);
         }
-        ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr, txState,
-            targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
-        mutations.clear();
     }
 
     @Override
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
index 064659ac26..22c020e434 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.ExplainPlanAttributes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -141,7 +143,7 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testReturnRowResult() throws Exception {
+    public void testReturnRowResult1() throws Exception {
         Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
                 "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
 
@@ -160,45 +162,7 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
             conn.createStatement().execute(ddl);
             createIndex(conn, tableName);
 
-            String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, 
COUNTER1, COL3, COL4)"
-                    + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON 
DUPLICATE KEY " +
-                    "IGNORE";
-            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
1011.202, null, true,
-                    bsonDocument1, bsonDocument1, 123);
-
-            upsertSql =
-                    "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
-                            + "VALUES('pk000', -123.98, 'pk003', 0) ON 
DUPLICATE"
-                            + " KEY IGNORE";
-            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
1011.202, null, false,
-                    null, bsonDocument1, 123);
-
-            upsertSql =
-                    "UPSERT INTO " + tableName
-                            + " (PK1, PK2, PK3, COUNTER1, COUNTER2) 
VALUES('pk000', -123.98, "
-                            + "'pk003', 234, 'col2_000')";
-            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 234d, 
"col2_000", true,
-                    null, bsonDocument1, 123);
-
-            upsertSql = "UPSERT INTO " + tableName
-                    + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
-                    + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
-                    + " END, "
-                    + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 
'col2_001' ELSE COUNTER2 "
-                    + "END, "
-                    + "COL3 = ?, "
-                    + "COL4 = 234";
-            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
2233.99, "col2_001", true,
-                    bsonDocument2, bsonDocument2, 234);
-
-            upsertSql = "UPSERT INTO " + tableName
-                    + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
-                    + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
-                    + " END,"
-                    + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 
'col2_001' ELSE COUNTER2 "
-                    + "END";
-            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
2233.99, "col2_001", false
-                    , null, bsonDocument2, 234);
+            validateAtomicUpsertReturnRow(tableName, conn, bsonDocument1, 
bsonDocument2);
 
             PhoenixPreparedStatement ps =
                     conn.prepareStatement(
@@ -212,37 +176,162 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
             validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", true, false,
                     bsonDocument2, 234);
 
-            addRows(tableName, conn);
+            validateMultiRowDelete(tableName, conn, bsonDocument2);
+        }
+    }
+
+    @Test
+    public void testReturnRowResult2() throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String sample1 = getJsonString("json/sample_01.json");
+        String sample2 = getJsonString("json/sample_02.json");
+        BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+        BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName
+                    + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR, 
COUNTER1 DOUBLE,"
+                    + " COUNTER2 VARCHAR,"
+                    + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY 
KEY(PK1, PK2, PK3))";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            validateAtomicUpsertReturnRow(tableName, conn, bsonDocument1, 
bsonDocument2);
+
+            verifyIndexRow(conn, tableName, false);
 
-            ps = conn.prepareStatement(
-                    "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?")
+            PhoenixPreparedStatement ps =
+                    conn.prepareStatement(
+                                    "DELETE FROM " + tableName + " WHERE PK1 = 
? AND PK2 " +
+                                            "= ? AND PK3 = ? AND COL4 = ?")
+                            .unwrap(PhoenixPreparedStatement.class);
+            ps.setString(1, "pk000");
+            ps.setDouble(2, -123.98);
+            ps.setString(3, "pk003");
+            ps.setInt(4, 235);
+            validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", true, false,
+                    bsonDocument2, 234);
+
+            ps = conn.prepareStatement("DELETE FROM " + tableName
+                            + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 
= ?")
                     .unwrap(PhoenixPreparedStatement.class);
-            ps.setString(1, "pk001");
-            ps.setDouble(2, 122.34);
-            validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", false, false,
+            ps.setString(1, "pk000");
+            ps.setDouble(2, -123.98);
+            ps.setString(3, "pk003");
+            ps.setInt(4, 234);
+            validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", true, true,
                     bsonDocument2, 234);
 
-            ps = conn.prepareStatement(
-                            "DELETE FROM " + 
tableName).unwrap(PhoenixPreparedStatement.class);
-            validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", false, false,
+            verifyIndexRow(conn, tableName, true);
+            validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", true, false,
                     bsonDocument2, 234);
 
-            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
" + tableName);
-            assertFalse(rs.next());
+            validateMultiRowDelete(tableName, conn, bsonDocument2);
+        }
+    }
 
-            addRows(tableName, conn);
+    private void verifyIndexRow(Connection conn, String tableName, boolean 
deleted) throws SQLException {
+        PreparedStatement preparedStatement =
+                conn.prepareStatement("SELECT COUNTER2 FROM " + tableName + " 
WHERE COUNTER1 " +
+                        "= ?");
+        preparedStatement.setDouble(1, 2233.99);
 
-            ps = conn.prepareStatement(
-                            "DELETE FROM " + tableName + " WHERE PK1 IN (?) 
AND PK2 IN (?) AND " +
-                                    "PK3 IN (?, ?)")
-                    .unwrap(PhoenixPreparedStatement.class);
-            ps.setString(1, "pk001");
-            ps.setDouble(2, 122.34);
-            ps.setString(3, "pk004");
-            ps.setString(4, "pk005");
-            validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", false, false,
-                    bsonDocument2, 234);
+        ResultSet resultSet = preparedStatement.executeQuery();
+        if (!deleted) {
+            assertTrue(resultSet.next());
+            assertEquals("col2_001", resultSet.getString(1));
         }
+        assertFalse(resultSet.next());
+
+        ExplainPlan plan =
+                
preparedStatement.unwrap(PhoenixPreparedStatement.class).optimizeQuery()
+                        .getExplainPlan();
+        ExplainPlanAttributes explainPlanAttributes = 
plan.getPlanStepsAsAttributes();
+        assertEquals(indexDDL.contains("index") ? (indexDDL.contains("local 
index") ?
+                        tableName + "_IDX(" + tableName + ")" : tableName + 
"_IDX") :
+                        tableName, explainPlanAttributes.getTableName());
+    }
+
+    private static void validateMultiRowDelete(String tableName, Connection 
conn,
+                                               BsonDocument bsonDocument2)
+            throws SQLException {
+        addRows(tableName, conn);
+
+        PhoenixPreparedStatement ps = conn.prepareStatement(
+                        "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = 
?")
+                .unwrap(PhoenixPreparedStatement.class);
+        ps.setString(1, "pk001");
+        ps.setDouble(2, 122.34);
+        validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", false, false,
+                bsonDocument2, 234);
+
+        ps = conn.prepareStatement(
+                "DELETE FROM " + 
tableName).unwrap(PhoenixPreparedStatement.class);
+        validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", false, false,
+                bsonDocument2, 234);
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + 
tableName);
+        assertFalse(rs.next());
+
+        addRows(tableName, conn);
+
+        ps = conn.prepareStatement(
+                        "DELETE FROM " + tableName + " WHERE PK1 IN (?) AND 
PK2 IN (?) AND " +
+                                "PK3 IN (?, ?)")
+                .unwrap(PhoenixPreparedStatement.class);
+        ps.setString(1, "pk001");
+        ps.setDouble(2, 122.34);
+        ps.setString(3, "pk004");
+        ps.setString(4, "pk005");
+        validateReturnedRowAfterDelete(conn, ps, tableName, 2233.99, 
"col2_001", false, false,
+                bsonDocument2, 234);
+    }
+
+    private static void validateAtomicUpsertReturnRow(String tableName, 
Connection conn, BsonDocument bsonDocument1,
+                                  BsonDocument bsonDocument2) throws 
SQLException {
+        String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, 
COUNTER1, COL3, COL4)"
+                + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON 
DUPLICATE KEY " +
+                "IGNORE";
+        validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202, 
null, true,
+                bsonDocument1, bsonDocument1, 123);
+
+        upsertSql =
+                "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
+                        + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE"
+                        + " KEY IGNORE";
+        validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202, 
null, false,
+                null, bsonDocument1, 123);
+
+        upsertSql =
+                "UPSERT INTO " + tableName
+                        + " (PK1, PK2, PK3, COUNTER1, COUNTER2) 
VALUES('pk000', -123.98, "
+                        + "'pk003', 234, 'col2_000')";
+        validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 234d, 
"col2_000", true,
+                null, bsonDocument1, 123);
+
+        upsertSql = "UPSERT INTO " + tableName
+                + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
+                + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
+                + " END, "
+                + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' 
ELSE COUNTER2 "
+                + "END, "
+                + "COL3 = ?, "
+                + "COL4 = 234";
+        validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99, 
"col2_001", true,
+                bsonDocument2, bsonDocument2, 234);
+
+        upsertSql = "UPSERT INTO " + tableName
+                + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
+                + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
+                + " END,"
+                + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' 
ELSE COUNTER2 "
+                + "END";
+        validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99, 
"col2_001", false
+                , null, bsonDocument2, 234);
     }
 
     private static void addRows(String tableName, Connection conn) throws 
SQLException {
@@ -292,31 +381,13 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
             return;
         }
 
-        validateReturnedRowResult(col1, col2, expectedDoc, col4, table, cell, 
result);
+        validateReturnedRowResult(col2, expectedDoc, col4, table, result);
     }
 
-    private static void validateReturnedRowResult(Double col1, String col2,
+    private static void validateReturnedRowResult(String col2,
                                                   BsonDocument expectedDoc, 
Integer col4,
-                                                  PTable table, Cell cell, 
ResultTuple result) {
-        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 1);
-        assertEquals("pk000",
-                PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
-
-        ptr = new ImmutableBytesPtr();
-        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 2);
-        assertEquals(-123.98,
-                PDouble.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
-
-        ptr = new ImmutableBytesPtr();
-        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 3);
-        assertEquals("pk003",
-                PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
-
-        assertEquals(col1,
-                PDouble.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(),
-                        cell.getValueLength()));
-        cell = result.getResult()
+                                                  PTable table, ResultTuple 
result) {
+        Cell cell = result.getResult()
                 
.getColumnLatestCell(table.getColumns().get(4).getFamilyName().getBytes(),
                         table.getColumns().get(4).getColumnQualifierBytes());
         if (col2 != null) {
@@ -378,7 +449,24 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
                 result.getResult()
                         
.getColumnLatestCell(table.getColumns().get(3).getFamilyName().getBytes(),
                                 
table.getColumns().get(3).getColumnQualifierBytes());
-        validateReturnedRowResult(col1, col2, expectedDoc, col4, table, cell, 
result);
+        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 1);
+        assertEquals("pk000",
+                PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
+
+        ptr = new ImmutableBytesPtr();
+        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 2);
+        assertEquals(-123.98,
+                PDouble.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
+
+        ptr = new ImmutableBytesPtr();
+        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 3);
+        assertEquals("pk003",
+                PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
+        assertEquals(col1,
+                PDouble.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(),
+                        cell.getValueLength()));
+        validateReturnedRowResult(col2, expectedDoc, col4, table, result);
     }
 
     @Test


Reply via email to