This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 205acd4562 PHOENIX-7398 New PhoenixStatement API to return row for 
Atomic/Conditional Upserts (#1967)
205acd4562 is described below

commit 205acd4562cb6908edd5ab446f13e33bcae2ba13
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Sep 10 17:13:46 2024 -0700

    PHOENIX-7398 New PhoenixStatement API to return row for Atomic/Conditional 
Upserts (#1967)
---
 .../apache/phoenix/exception/SQLExceptionCode.java |   2 +
 .../org/apache/phoenix/execute/MutationState.java  |  34 +++
 .../phoenix/index/PhoenixIndexBuilderHelper.java   |   4 +
 .../phoenix/jdbc/PhoenixPreparedStatement.java     |  30 ++-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  69 ++++-
 .../phoenix/hbase/index/IndexRegionObserver.java   | 215 ++++++++++++----
 .../hbase/index/builder/BaseIndexBuilder.java      |   5 +
 .../hbase/index/builder/IndexBuildManager.java     |   5 +
 .../phoenix/hbase/index/builder/IndexBuilder.java  |   8 +
 .../apache/phoenix/index/PhoenixIndexBuilder.java  |   6 +-
 .../iterate/NonAggregateRegionScannerFactory.java  |  35 ++-
 .../java/org/apache/phoenix/end2end/Bson4IT.java   | 285 +++++++++++++++++++--
 .../apache/phoenix/end2end/OnDuplicateKey2IT.java  | 197 ++++++++++++++
 13 files changed, 806 insertions(+), 89 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index c060d4d20f..ee83869611 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -632,6 +632,8 @@ public enum SQLExceptionCode {
     STALE_METADATA_CACHE_EXCEPTION(915, "43M26", "Stale metadata cache 
exception",
         info -> new StaleMetadataCacheException(info.getMessage())),
 
+    AUTO_COMMIT_NOT_ENABLED(916, "43M27", "Connection does not have 
auto-commit enabled"),
+
     //SQLCode for testing exceptions
     FAILED_KNOWINGLY_FOR_TEST(7777, "TEST", "Exception was thrown to test 
something");
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 529b6cd60e..e5a0c3ede2 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -187,6 +187,15 @@ public class MutationState implements SQLCloseable {
 
     private final boolean indexRegionObserverEnabledAllTables;
 
+    /**
+     * Return result back to client. To be used when client needs to read the 
whole row
+     * or some specific attributes of the row back. As of PHOENIX-7398, 
returning whole
+     * row is supported. This is used to allow client to set Mutation 
attribute that is read
+     * by server for it to atomically read the row and return it back.
+     */
+    private ReturnResult returnResult;
+    private Result result;
+
     public static void resetAllMutationState(){
         allDeletesMutations = true;
         allUpsertsMutations = true;
@@ -482,6 +491,10 @@ public class MutationState implements SQLCloseable {
         return numRows;
     }
 
+    public Result getResult() {
+        return this.result;
+    }
+
     private MultiRowMutationState getLastMutationBatch(Map<TableRef, 
List<MultiRowMutationState>> mutations, TableRef tableRef) {
         List<MultiRowMutationState> mutationBatches = mutations.get(tableRef);
         if (mutationBatches == null || mutationBatches.isEmpty()) {
@@ -847,6 +860,12 @@ public class MutationState implements SQLCloseable {
                     if (onDupKeyBytes != null) {
                         
mutation.setAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB, 
onDupKeyBytes);
                     }
+                    if (this.returnResult != null) {
+                        if (this.returnResult == ReturnResult.ROW) {
+                            
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+                                    
PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
+                        }
+                    }
                 }
                 rowMutationsPertainingToIndex = rowMutations;
             }
@@ -1127,6 +1146,10 @@ public class MutationState implements SQLCloseable {
         return batchCount;
     }
 
+    public void setReturnResult(ReturnResult returnResult) {
+        this.returnResult = returnResult;
+    }
+
     public static final class MutationBytes {
 
         private long deleteMutationCounter;
@@ -1550,6 +1573,11 @@ public class MutationState implements SQLCloseable {
                                 numUpdatedRowsForAutoCommit = 
PInteger.INSTANCE.getCodec()
                                         .decodeInt(cell.getValueArray(), 
cell.getValueOffset(),
                                                 SortOrder.getDefault());
+                                if (this.returnResult != null) {
+                                    if (this.returnResult == ReturnResult.ROW) 
{
+                                        this.result = result;
+                                    }
+                                }
                             } else {
                                 numUpdatedRowsForAutoCommit = 1;
                             }
@@ -2005,6 +2033,7 @@ public class MutationState implements SQLCloseable {
         estimatedSize = 0;
         this.mutationsMap.clear();
         phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
+        this.returnResult = null;
     }
 
     private void resetTransactionalState() {
@@ -2418,4 +2447,9 @@ public class MutationState implements SQLCloseable {
     public boolean isEmpty() {
         return mutationsMap != null ? mutationsMap.isEmpty() : true;
     }
+
+    public enum ReturnResult {
+        ROW
+    }
+
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
index 81bda56408..6ef1d9e38a 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
@@ -38,6 +38,10 @@ public final class PhoenixIndexBuilderHelper {
     private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; // 
boolean true
     private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT 
+ Bytes.SIZEOF_BOOLEAN;
     public static final String ATOMIC_OP_ATTRIB = "_ATOMIC_OP_ATTRIB";
+
+    public static final String RETURN_RESULT = "_RETURN_RESULT";
+    public static final byte[] RETURN_RESULT_ROW = new byte[]{0};
+
     public static byte[] serializeOnDupKeyIgnore() {
         return ON_DUP_KEY_IGNORE_BYTES;
     }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 9a00f9bbe5..036020df39 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -46,15 +46,18 @@ import java.util.Calendar;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.BindManager;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
 import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
 import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.SQLCloseable;
@@ -197,6 +200,11 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Phoeni
 
     @Override
     public int executeUpdate() throws SQLException {
+        preExecuteUpdate();
+        return executeMutation(statement, 
createAuditQueryLogger(statement,query));
+    }
+
+    private void preExecuteUpdate() throws SQLException {
         throwIfUnboundParameters();
         if (!statement.getOperation().isMutation()) {
             throw new 
ExecuteUpdateNotApplicableException(statement.getOperation());
@@ -205,7 +213,27 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Phoeni
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
             .build().buildException();
         }
-        return executeMutation(statement, 
createAuditQueryLogger(statement,query));
+    }
+
+    /**
+     * Executes the given SQL statement similar to JDBC API executeUpdate() 
but also returns the
+     * updated or non-updated row as Result object back to the client. This 
must be used with
+     * auto-commit Connection. This makes the operation atomic.
+     * If the row is successfully updated, return the updated row, otherwise 
if the row
+     * cannot be updated, return non-updated row.
+     *
+     * @return The pair of int and Tuple, where int represents value 1 for 
successful row update
+     * and 0 for non-successful row update, and Tuple represents the state of 
the row.
+     * @throws SQLException If the statement cannot be executed.
+     */
+    public Pair<Integer, Tuple> executeUpdateReturnRow() throws SQLException {
+        if (!connection.getAutoCommit()) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+                    .buildException();
+        }
+        preExecuteUpdate();
+        return executeMutation(statement, createAuditQueryLogger(statement, 
query),
+                MutationState.ReturnResult.ROW);
     }
 
     public QueryPlan optimizeQuery() throws SQLException {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 92e2aa07ee..b9f895b777 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.call.CallRunner;
@@ -107,6 +108,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.exception.StaleMetadataCacheException;
 import org.apache.phoenix.exception.UpgradeRequiredException;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.ReturnResult;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
@@ -209,6 +211,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
@@ -576,11 +579,22 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
     }
 
 
-    protected int executeMutation(final CompilableStatement stmt, final 
AuditQueryLogger queryLogger) throws SQLException {
-        return executeMutation(stmt, true, queryLogger);
+    protected int executeMutation(final CompilableStatement stmt,
+                                  final AuditQueryLogger queryLogger) throws 
SQLException {
+        return executeMutation(stmt, true, queryLogger, null).getFirst();
     }
 
-    private int executeMutation(final CompilableStatement stmt, final boolean 
doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger) throws 
SQLException {
+    Pair<Integer, Tuple> executeMutation(final CompilableStatement stmt,
+                                   final AuditQueryLogger queryLogger,
+                                   final ReturnResult returnResult) throws 
SQLException {
+        return executeMutation(stmt, true, queryLogger, returnResult);
+    }
+
+    private Pair<Integer, Tuple> executeMutation(final CompilableStatement 
stmt,
+                                                  final boolean 
doRetryOnMetaNotFoundError,
+                                                  final AuditQueryLogger 
queryLogger,
+                                                  final ReturnResult 
returnResult)
+            throws SQLException {
         if (connection.isReadOnly()) {
             throw new SQLExceptionInfo.Builder(
                 SQLExceptionCode.READ_ONLY_CONNECTION).
@@ -590,9 +604,9 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
         try {
             return CallRunner
                    .run(
-                        new CallRunner.CallableThrowable<Integer, 
SQLException>() {
+                           new CallRunner.CallableThrowable<Pair<Integer, 
Tuple>, SQLException>() {
                         @Override
-                            public Integer call() throws SQLException {
+                            public Pair<Integer, Tuple> call() throws 
SQLException {
                             boolean success = false;
                             String tableName = null;
                             boolean isUpsert = false;
@@ -631,12 +645,15 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 // just max out at Integer.MAX_VALUE
                                 int lastUpdateCount = (int) 
Math.min(Integer.MAX_VALUE,
                                         lastState.getUpdateCount());
+                                Result result = null;
                                 if (connection.getAutoCommit()) {
+                                    state.setReturnResult(returnResult);
                                     connection.commit();
                                     if (isAtomicUpsert) {
                                         lastUpdateCount = 
connection.getMutationState()
                                                 
.getNumUpdatedRowsForAutoCommit();
                                     }
+                                    result = 
connection.getMutationState().getResult();
                                 }
                                 setLastQueryPlan(null);
                                 setLastUpdateCount(lastUpdateCount);
@@ -651,7 +668,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 }
 
                                 success = true;
-                                return lastUpdateCount;
+                                return new Pair<>(lastUpdateCount, new 
ResultTuple(result));
                             }
                             //Force update cache and retry if meta not found 
error occurs
                             catch (MetaDataEntityNotFoundException e) {
@@ -661,7 +678,8 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                     }
                                     if (new 
MetaDataClient(connection).updateCache(connection.getTenantId(),
                                         e.getSchemaName(), e.getTableName(), 
true).wasUpdated()) {
-                                        return executeMutation(stmt, false, 
queryLogger);
+                                        return executeMutation(stmt, false, 
queryLogger,
+                                                returnResult);
                                     }
                                 }
                                 throw e;
@@ -2376,17 +2394,46 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
 
     @Override
     public int executeUpdate(String sql) throws SQLException {
+        CompilableStatement stmt = preExecuteUpdate(sql);
+        int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, 
sql));
+        flushIfNecessary();
+        return updateCount;
+    }
+
+    /**
+     * Executes the given SQL statement similar to JDBC API executeUpdate() 
but also returns the
+     * updated or non-updated row as Result object back to the client. This 
must be used with
+     * auto-commit Connection. This makes the operation atomic.
+     * If the row is successfully updated, return the updated row, otherwise 
if the row
+     * cannot be updated, return non-updated row.
+     *
+     * @param sql The SQL DML statement, UPSERT or DELETE for Phoenix.
+     * @return The pair of int and Tuple, where int represents value 1 for 
successful row
+     * update and 0 for non-successful row update, and Tuple represents the 
state of the row.
+     * @throws SQLException If the statement cannot be executed.
+     */
+    public Pair<Integer, Tuple> executeUpdateReturnRow(String sql) throws 
SQLException {
+        if (!connection.getAutoCommit()) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+                    .buildException();
+        }
+        CompilableStatement stmt = preExecuteUpdate(sql);
+        Pair<Integer, Tuple> result =
+                executeMutation(stmt, createAuditQueryLogger(stmt, sql), 
ReturnResult.ROW);
+        flushIfNecessary();
+        return result;
+    }
+
+    private CompilableStatement preExecuteUpdate(String sql) throws 
SQLException {
         CompilableStatement stmt = parseStatement(sql);
         if (!stmt.getOperation().isMutation) {
             throw new ExecuteUpdateNotApplicableException(sql);
         }
         if (!batch.isEmpty()) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
-            .build().buildException();
+                    .build().buildException();
         }
-        int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, 
sql));
-        flushIfNecessary();
-        return updateCount;
+        return stmt;
     }
 
     private void flushIfNecessary() throws SQLException {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b2e3b5ff9d..8be7419780 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -141,6 +141,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
 import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
 import static 
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
+import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.RETURN_RESULT;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 /**
@@ -257,6 +258,12 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       // The latches of the threads waiting for this batch to complete
       private List<CountDownLatch> waitList = null;
       private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
+      // store current cells into a map where the key is ColumnReference of 
the column family and
+      // column qualifier, and value is a pair of cell and a boolean. The 
value of the boolean
+      // will be true if the expression is CaseExpression and Else-clause is 
evaluated to be
+      // true, will be null if there is no expression on this column, 
otherwise false
+      // This is only initialized for single row atomic mutation.
+      private Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap;
 
       //list containing the original mutations from the 
MiniBatchOperationInProgress. Contains
       // any annotations we were sent by the client, and can be used in hooks 
that don't get
@@ -268,6 +275,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       private boolean hasGlobalIndex;
       private boolean hasLocalIndex;
       private boolean hasTransform;
+      private boolean returnResult;
+
       public BatchMutateContext() {
           this.clientVersion = 0;
       }
@@ -516,23 +525,14 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         "Somehow didn't return an index update but also didn't propagate the 
failure to the client!");
   }
 
-  private void ignoreAtomicOperations (MiniBatchOperationInProgress<Mutation> 
miniBatchOp) {
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isAtomicOp(m)) {
-              miniBatchOp.setOperationStatus(i, IGNORE);
-          }
-      }
-  }
-
   private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
           BatchMutateContext context) {
       for (int i = 0; i < miniBatchOp.size(); i++) {
           Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isAtomicOp(m) || this.builder.isEnabled(m)) {
+          if (this.builder.isAtomicOp(m) || this.builder.returnResult(m) ||
+                  this.builder.isEnabled(m)) {
               ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
               context.rowsToLock.add(row);
-
           }
       }
   }
@@ -572,8 +572,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                                         BatchMutateContext context) throws 
IOException {
       for (int i = 0; i < miniBatchOp.size(); i++) {
           Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isAtomicOp(m) && m instanceof Put) {
-              List<Mutation> mutations = generateOnDupMutations(context, 
(Put)m);
+          if ((this.builder.isAtomicOp(m) || this.builder.returnResult(m)) && 
m instanceof Put) {
+              List<Mutation> mutations = generateOnDupMutations(context, 
(Put)m, miniBatchOp);
               if (!mutations.isEmpty()) {
                   addOnDupMutationsToBatch(miniBatchOp, i, mutations);
               } else {
@@ -583,11 +583,19 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                   // them the new value is the same as the old value in the 
ELSE-clause (empty
                   // cell timestamp will NOT be updated)
                   byte[] retVal = PInteger.INSTANCE.toBytes(0);
-                  Cell cell = PhoenixKeyValueUtil.newKeyValue(m.getRow(), 
Bytes.toBytes(UPSERT_CF),
-                          Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0, 
retVal.length);
+                  List<Cell> cells = new ArrayList<>();
+                  cells.add(PhoenixKeyValueUtil.newKeyValue(m.getRow(), 
Bytes.toBytes(UPSERT_CF),
+                          Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0, 
retVal.length));
+
+                  if (context.returnResult) {
+                      context.currColumnCellExprMap.forEach(
+                              (key, value) -> cells.add(value.getFirst()));
+                      cells.sort(CellComparator.getInstance());
+                  }
+
                   // put Result in OperationStatus for returning update status 
from conditional
                   // upserts, where 0 represents the row is not updated
-                  Result result = Result.create(new 
ArrayList<>(Arrays.asList(cell)));
+                  Result result = Result.create(cells);
                   miniBatchOp.setOperationStatus(i,
                           new OperationStatus(SUCCESS, result));
               }
@@ -647,7 +655,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             // skip this mutation if we aren't enabling indexing or not an 
atomic op
             // or if it is an atomic op and its timestamp is already set(not 
LATEST)
             if (!builder.isEnabled(m) &&
-                !(builder.isAtomicOp(m) && IndexUtil.getMaxTimestamp(m) == 
HConstants.LATEST_TIMESTAMP)) {
+                    !((builder.isAtomicOp(m) || builder.returnResult(m)) &&
+                            IndexUtil.getMaxTimestamp(m) == 
HConstants.LATEST_TIMESTAMP)) {
                 continue;
             }
             setTimestampOnMutation(m, ts);
@@ -1146,14 +1155,18 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                                               BatchMutateContext context) {
         for (int i = 0; i < miniBatchOp.size(); i++) {
             Mutation m = miniBatchOp.getOperation(i);
-            if (this.builder.isAtomicOp(m)) {
+            if (this.builder.returnResult(m) && miniBatchOp.size() == 1) {
+                context.returnResult = true;
+            }
+            if (this.builder.isAtomicOp(m) || this.builder.returnResult(m)) {
                 context.hasAtomic = true;
                 if (context.hasDelete) {
                     return;
                 }
-            } else if (m instanceof Delete)
+            } else if (m instanceof Delete) {
                 context.hasDelete = true;
-            if (context.hasAtomic) {
+            }
+            if (context.hasAtomic || context.returnResult) {
                 return;
             }
         }
@@ -1284,20 +1297,21 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         lockRows(context);
         long onDupCheckTime = 0;
 
-        if (context.hasAtomic || context.hasGlobalIndex || 
context.hasUncoveredIndex || context.hasTransform) {
+        if (context.hasAtomic || context.returnResult || 
context.hasGlobalIndex ||
+                context.hasUncoveredIndex || context.hasTransform) {
             // Retrieve the current row states from the data table while 
holding the lock.
             // This is needed for both atomic mutations and global indexes
             long start = EnvironmentEdgeManager.currentTimeMillis();
             context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, 
Put>>(context.rowsToLock.size());
             if (context.hasGlobalIndex || context.hasTransform || 
context.hasAtomic ||
-                    context.hasDelete ||  (context.hasUncoveredIndex &&
+                    context.returnResult || context.hasDelete || 
(context.hasUncoveredIndex &&
                     isPartialUncoveredIndexMutation(indexMetaData, 
miniBatchOp))) {
                 getCurrentRowStates(c, context);
             }
             onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - 
start);
         }
 
-        if (context.hasAtomic) {
+        if (context.hasAtomic || context.returnResult) {
             long start = EnvironmentEdgeManager.currentTimeMillis();
             // add the mutations for conditional updates to the mini batch
             addOnDupMutationsToBatch(miniBatchOp, context);
@@ -1359,7 +1373,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                 continue;
             }
             Mutation m = miniBatchOp.getOperation(i);
-            if (!this.builder.isAtomicOp(m)) {
+            if (!this.builder.isAtomicOp(m) && !this.builder.returnResult(m)) {
                 continue;
             }
             ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
@@ -1440,13 +1454,18 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       try {
           if (success) {
               context.currentPhase = BatchMutatePhase.POST;
-              if(context.hasAtomic && miniBatchOp.size() == 1) {
+              if ((context.hasAtomic || context.returnResult) && 
miniBatchOp.size() == 1) {
                   if 
(!isAtomicOperationComplete(miniBatchOp.getOperationStatus(0))) {
                       byte[] retVal = PInteger.INSTANCE.toBytes(1);
                       Cell cell = PhoenixKeyValueUtil.newKeyValue(
                               miniBatchOp.getOperation(0).getRow(), 
Bytes.toBytes(UPSERT_CF),
                               Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0, 
retVal.length);
-                      Result result = Result.create(new 
ArrayList<>(Arrays.asList(cell)));
+                      List<Cell> cells = new ArrayList<>();
+                      cells.add(cell);
+
+                      addCellsIfResultReturned(miniBatchOp, context, cells);
+
+                      Result result = Result.create(cells);
                       miniBatchOp.setOperationStatus(0,
                               new OperationStatus(SUCCESS, result));
                   }
@@ -1470,6 +1489,62 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
        }
   }
 
+    /**
+     * If the result needs to be returned for the given update operation, 
identify the updated row
+     * cells and add the input list of cells.
+     *
+     * @param miniBatchOp Batch of mutations getting applied to region.
+     * @param context The BatchMutateContext object shared during coproc hooks 
execution as part of
+     * the batch mutate life cycle.
+     * @param cells The list of cells to be returned back to the client.
+     */
+    private static void 
addCellsIfResultReturned(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                 BatchMutateContext context, 
List<Cell> cells) {
+        if (context.returnResult) {
+            Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap =
+                    context.currColumnCellExprMap;
+            Mutation mutation = miniBatchOp.getOperation(0);
+            updateColumnCellExprMap(mutation, currColumnCellExprMap);
+            Mutation[] mutations = 
miniBatchOp.getOperationsFromCoprocessors(0);
+            if (mutations != null) {
+                for (Mutation m : mutations) {
+                    updateColumnCellExprMap(m, currColumnCellExprMap);
+                }
+            }
+            for (Pair<Cell, Boolean> cellPair : 
currColumnCellExprMap.values()) {
+                cells.add(cellPair.getFirst());
+            }
+            cells.sort(CellComparator.getInstance());
+        }
+    }
+
+    /**
+     * Update the contents of {@code currColumnCellExprMap} based on the 
mutation that was
+     * successfully applied to the row.
+     *
+     * @param mutation The Mutation object which is applied to the row.
+     * @param currColumnCellExprMap The map of column to cell reference.
+     */
+    private static void updateColumnCellExprMap(Mutation mutation,
+                                                Map<ColumnReference, 
Pair<Cell, Boolean>>
+                                                        currColumnCellExprMap) 
{
+        if (mutation != null) {
+            for (Map.Entry<byte[], List<Cell>> entry :
+                    mutation.getFamilyCellMap().entrySet()) {
+                for (Cell entryCell : entry.getValue()) {
+                    byte[] family = CellUtil.cloneFamily(entryCell);
+                    byte[] qualifier = CellUtil.cloneQualifier(entryCell);
+                    ColumnReference colRef = new ColumnReference(family, 
qualifier);
+                    if (mutation instanceof Put) {
+                        currColumnCellExprMap.put(colRef, new 
Pair<>(entryCell, null));
+                    } else if (mutation instanceof Delete) {
+                        currColumnCellExprMap.remove(colRef);
+                    }
+                }
+            }
+        }
+    }
+
   private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) throws IOException {
       long start = EnvironmentEdgeManager.currentTimeMillis();
 
@@ -1586,12 +1661,21 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
      * Otherwise, we will generate one Put mutation and optionally one Delete 
mutation (with
      * DeleteColumn type cells for all columns set to null).
      */
-  private List<Mutation> generateOnDupMutations(BatchMutateContext context, 
Put atomicPut) throws IOException {
+    private List<Mutation> generateOnDupMutations(BatchMutateContext context,
+                                                  Put atomicPut,
+                                                  
MiniBatchOperationInProgress<Mutation> miniBatchOp)
+            throws IOException {
       List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
       byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
-      if (opBytes == null) { // Unexpected
-          return null;
-      }
+        byte[] returnResult = atomicPut.getAttribute(RETURN_RESULT);
+        if ((opBytes == null && returnResult == null) ||
+                (opBytes == null && miniBatchOp.size() != 1)) {
+            // Unexpected
+            // Either mutation should be atomic by providing non-null ON 
DUPLICATE KEY, or
+            // if the result needs to be returned, only single row must be 
updated as part of
+            // the batch mutation.
+            return null;
+        }
       Put put = null;
       Delete delete = null;
 
@@ -1599,16 +1683,40 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       // later these timestamps will be updated by the 
IndexRegionObserver#setTimestamps() function
       long ts = HConstants.LATEST_TIMESTAMP;
 
-      byte[] rowKey = atomicPut.getRow();
+        // store current cells into a map where the key is ColumnReference of 
the column family and
+        // column qualifier, and value is a pair of cell and a boolean. The 
value of the boolean
+        // will be true if the expression is CaseExpression and Else-clause is 
evaluated to be
+        // true, will be null if there is no expression on this column, 
otherwise false
+        Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new 
HashMap<>();
+
+        byte[] rowKey = atomicPut.getRow();
       ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey);
       // Get the latest data row state
       Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
       Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() 
: null;
 
+        // if result needs to be returned but the DML does not have ON 
DUPLICATE KEY present,
+        // perform the mutation and return the result.
+        if (opBytes == null) {
+            mutations.add(atomicPut);
+            updateCurrColumnCellExpr(currentDataRowState != null ? 
currentDataRowState : atomicPut,
+                    currColumnCellExprMap);
+            if (context.returnResult) {
+                context.currColumnCellExprMap = currColumnCellExprMap;
+            }
+            return mutations;
+        }
+
       if (PhoenixIndexBuilderHelper.isDupKeyIgnore(opBytes)) {
           if (currentDataRowState == null) {
               // new row
               mutations.add(atomicPut);
+              updateCurrColumnCellExpr(atomicPut, currColumnCellExprMap);
+          } else {
+              updateCurrColumnCellExpr(currentDataRowState, 
currColumnCellExprMap);
+          }
+          if (context.returnResult) {
+              context.currColumnCellExprMap = currColumnCellExprMap;
           }
           return mutations;
       }
@@ -1631,19 +1739,17 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       // read the column values requested in the get from the current data row
       List<Cell> cells = IndexUtil.readColumnsFromRow(currentDataRowState, 
colsReadInExpr);
 
-      // store current cells into a map where the key is ColumnReference of 
the column family and
-      // column qualifier, and value is a pair of cell and a boolean. The 
value of the boolean
-      // will be true if the expression is CaseExpression and Else-clause is 
evaluated to be
-      // true, will be null if there is no expression on this column, 
otherwise false
-      Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new 
HashMap<>();
-
       if (currentDataRowState == null) { // row doesn't exist
+          updateCurrColumnCellExpr(atomicPut, currColumnCellExprMap);
           if (skipFirstOp) {
               if (operations.size() <= 1 && repeat <= 1) {
                   // early exit since there is only one ON DUPLICATE KEY UPDATE
                   // clause which is ignored because the row doesn't exist so
                   // simply use the values in UPSERT VALUES
                   mutations.add(atomicPut);
+                  if (context.returnResult) {
+                      context.currColumnCellExprMap = currColumnCellExprMap;
+                  }
                   return mutations;
               }
               // If there are multiple ON DUPLICATE KEY UPDATE on a new row,
@@ -1656,17 +1762,13 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           // Base current state off of existing row
           flattenedCells = cells;
           // store all current cells from currentDataRowState
-          for (Map.Entry<byte[], List<Cell>> entry :
-                  currentDataRowState.getFamilyCellMap().entrySet()) {
-              for (Cell cell : new ArrayList<>(entry.getValue())) {
-                  byte[] family = CellUtil.cloneFamily(cell);
-                  byte[] qualifier = CellUtil.cloneQualifier(cell);
-                  ColumnReference colRef = new ColumnReference(family, 
qualifier);
-                  currColumnCellExprMap.put(colRef, new Pair<>(cell, null));
-              }
-          }
+          updateCurrColumnCellExpr(currentDataRowState, currColumnCellExprMap);
       }
 
+        if (context.returnResult) {
+            context.currColumnCellExprMap = currColumnCellExprMap;
+        }
+
       MultiKeyValueTuple tuple = new MultiKeyValueTuple(flattenedCells);
       ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 
@@ -1764,6 +1866,27 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       return mutations;
   }
 
+    /**
+     * Create or Update ColumnRef to Cell map based on the Put mutation.
+     *
+     * @param put The Put mutation representing the current or new/updated 
state of the row.
+     * @param currColumnCellExprMap ColumnRef to Cell mapping for all the 
cells involved in the
+     * given mutation.
+     */
+    private static void updateCurrColumnCellExpr(Put put,
+                                                 Map<ColumnReference, 
Pair<Cell, Boolean>>
+                                                         
currColumnCellExprMap) {
+        for (Map.Entry<byte[], List<Cell>> entry :
+                put.getFamilyCellMap().entrySet()) {
+            for (Cell cell : entry.getValue()) {
+                byte[] family = CellUtil.cloneFamily(cell);
+                byte[] qualifier = CellUtil.cloneQualifier(cell);
+                ColumnReference colRef = new ColumnReference(family, 
qualifier);
+                currColumnCellExprMap.put(colRef, new Pair<>(cell, null));
+            }
+        }
+    }
+
     private void addEmptyKVCellToPut(Put put, MultiKeyValueTuple tuple, PTable 
table) throws IOException {
         byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
         byte[] emptyCQ = 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index eec363fb5a..bbe94df7a1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -132,6 +132,11 @@ public abstract class BaseIndexBuilder implements 
IndexBuilder {
         return null;
     }
 
+    @Override
+    public boolean returnResult(Mutation m) {
+        return false;
+    }
+
     public RegionCoprocessorEnvironment getEnv() {
         return this.env;
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 72fcf37b0f..68076a4d79 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -180,4 +180,9 @@ public class IndexBuildManager implements Stoppable {
   public ReplayWrite getReplayWrite(Mutation m) throws IOException {
     return this.delegate.getReplayWrite(m);
   }
+
+  public boolean returnResult(Mutation m) {
+    return delegate.returnResult(m);
+  }
+
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 8846650b3f..23a642ed69 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -150,4 +150,12 @@ public interface IndexBuilder extends Stoppable {
   public List<Mutation> executeAtomicOp(Increment inc) throws IOException;
 
   public ReplayWrite getReplayWrite(Mutation m);
+
+  /**
+   * True if mutation needs to return result.
+   *
+   * @param m Mutation object.
+   * @return True if mutation needs to return result, False otherwise.
+   */
+  boolean returnResult(Mutation m);
 }
\ No newline at end of file
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 6720c6b5a2..97c8f7bb7d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -274,5 +274,9 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
     public ReplayWrite getReplayWrite(Mutation m) {
         return PhoenixIndexMetaData.getReplayWrite(m.getAttributesMap());
     }
-    
+
+    @Override
+    public boolean returnResult(Mutation m) {
+        return m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT) != null;
+    }
 }
\ No newline at end of file
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 72b7c7e5d5..9ec70a8fd8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -226,20 +226,12 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         if (serverParsedArrayFuncRefs != null) {
             Collections.addAll(resultList, serverParsedArrayFuncRefs);
         }
-        Expression[] serverParsedJsonValueFuncRefs = null;
-        if 
(scan.getAttribute(BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION) != 
null) {
-            serverParsedJsonValueFuncRefs =
-                    
deserializeServerParsedPositionalExpressionInfoFromScan(scan,
-                            
BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION, serverParsedKVRefs);
-        }
-        if 
(scan.getAttribute(BaseScannerRegionObserverConstants.BSON_VALUE_FUNCTION) != 
null) {
-            serverParsedJsonValueFuncRefs =
-                deserializeServerParsedPositionalExpressionInfoFromScan(scan,
-                    BaseScannerRegionObserverConstants.BSON_VALUE_FUNCTION, 
serverParsedKVRefs);
-        }
-        if (serverParsedJsonValueFuncRefs != null) {
-            Collections.addAll(resultList, serverParsedJsonValueFuncRefs);
-        }
+        deserializeAndAddComplexDataTypeFunctions(scan,
+                BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION, 
serverParsedKVRefs,
+                resultList);
+        deserializeAndAddComplexDataTypeFunctions(scan,
+                BaseScannerRegionObserverConstants.BSON_VALUE_FUNCTION, 
serverParsedKVRefs,
+                resultList);
         Expression[] serverParsedJsonQueryFuncRefs = null;
         if 
(scan.getAttribute(BaseScannerRegionObserverConstants.JSON_QUERY_FUNCTION) != 
null) {
             serverParsedJsonQueryFuncRefs =
@@ -252,6 +244,21 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         return resultList;
     }
 
+    private void deserializeAndAddComplexDataTypeFunctions(Scan scan,
+                                                           String functionName,
+                                                           
Set<KeyValueColumnExpression>
+                                                                   
serverParsedKVRefs,
+                                                           List<Expression> 
resultList) {
+        if (scan.getAttribute(functionName) != null) {
+            Expression[] serverParsedJsonValueFuncRefs =
+                    
deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+                            functionName, serverParsedKVRefs);
+            if (serverParsedJsonValueFuncRefs != null) {
+                Collections.addAll(resultList, serverParsedJsonValueFuncRefs);
+            }
+        }
+    }
+
     @VisibleForTesting
     static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner 
s,
                                                      boolean spoolingEnabled, 
long thresholdBytes) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
index dfa8fa3c4d..853fd0ffa8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
@@ -31,9 +31,18 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBson;
+import org.apache.phoenix.schema.types.PDouble;
 import org.bson.BsonArray;
 import org.bson.BsonBinary;
 import org.bson.BsonDocument;
+import org.bson.BsonDouble;
 import org.bson.BsonNull;
 import org.bson.BsonString;
 import org.bson.RawBsonDocument;
@@ -134,22 +143,8 @@ public class Bson4IT extends ParallelStatsDisabledIT {
       BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
       BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
 
-      PreparedStatement stmt =
-              conn.prepareStatement("UPSERT INTO " + tableName + " VALUES 
(?,?,?)");
-      stmt.setString(1, "pk0001");
-      stmt.setString(2, "0002");
-      stmt.setObject(3, bsonDocument1);
-      stmt.executeUpdate();
-
-      stmt.setString(1, "pk1010");
-      stmt.setString(2, "1010");
-      stmt.setObject(3, bsonDocument2);
-      stmt.executeUpdate();
-
-      stmt.setString(1, "pk1011");
-      stmt.setString(2, "1011");
-      stmt.setObject(3, bsonDocument3);
-      stmt.executeUpdate();
+      upsertRows(conn, tableName, bsonDocument1, bsonDocument2, bsonDocument3);
+      PreparedStatement stmt;
 
       conn.commit();
 
@@ -254,6 +249,264 @@ public class Bson4IT extends ParallelStatsDisabledIT {
     }
   }
 
+  @Test
+  public void testConditionalUpsertReturnRow() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    String tableName = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.setAutoCommit(true);
+      String ddl = "CREATE TABLE " + tableName
+              + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
+              + " CONSTRAINT pk PRIMARY KEY(PK1))";
+      conn.createStatement().execute(ddl);
+
+      String sample1 = getJsonString("json/sample_01.json");
+      String sample2 = getJsonString("json/sample_02.json");
+      String sample3 = getJsonString("json/sample_03.json");
+      BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+      BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+      BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
+
+      upsertRows(conn, tableName, bsonDocument1, bsonDocument2, bsonDocument3);
+      PreparedStatement stmt;
+
+      String conditionExpression =
+              "press = :press AND track[0].shot[2][0].city.standard[50] = 
:softly";
+
+      BsonDocument conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":press", new BsonString("beat"))
+              .append(":softly", new BsonString("softly")));
+
+      String query = "SELECT * FROM " + tableName +
+              " WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT 
BSON_CONDITION_EXPRESSION(COL, '"
+              + conditionDoc.toJson() + "')";
+      ResultSet rs = conn.createStatement().executeQuery(query);
+
+      assertTrue(rs.next());
+      assertEquals("pk0001", rs.getString(1));
+      assertEquals("0002", rs.getString(2));
+      BsonDocument document1 = (BsonDocument) rs.getObject(3);
+      assertEquals(bsonDocument1, document1);
+
+      assertFalse(rs.next());
+
+      conditionExpression =
+              "press = :press AND track[0].shot[2][0].city.standard[5] = 
:softly";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":press", new BsonString("beat"))
+              .append(":softly", new BsonString("softly")));
+
+      query = "SELECT * FROM " + tableName +
+              " WHERE PK1 = 'pk0001' AND C1 = '0002' AND 
BSON_CONDITION_EXPRESSION(COL, '"
+              + conditionDoc.toJson() + "')";
+      rs = conn.createStatement().executeQuery(query);
+
+      assertTrue(rs.next());
+      assertEquals("pk0001", rs.getString(1));
+      assertEquals("0002", rs.getString(2));
+      document1 = (BsonDocument) rs.getObject(3);
+      assertEquals(bsonDocument1, document1);
+
+      assertFalse(rs.next());
+
+      BsonDocument updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("browserling",
+                              new 
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+                      .append("track[0].shot[2][0].city.standard[5]", new 
BsonString("soft"))
+                      .append("track[0].shot[2][0].city.problem[2]",
+                              new 
BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
+              .append("$UNSET", new BsonDocument()
+                      .append("track[0].shot[2][0].city.flame", new 
BsonNull()));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END,"
+              + " C1 = ?");
+      stmt.setString(1, "pk0001");
+      stmt.setString(2, "0003");
+
+      // Conditional Upsert successful
+      assertReturnedRowResult(stmt, conn, tableName, 
"json/sample_updated_01.json", true);
+
+      updateExp = new BsonDocument()
+              .append("$ADD", new BsonDocument()
+                      .append("new_samples",
+                              new BsonDocument().append("$set",
+                                      new BsonArray(Arrays.asList(
+                                              new 
BsonBinary(Bytes.toBytes("Sample10")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample12")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample13")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample14"))
+                                      )))))
+              .append("$DELETE_FROM_SET", new BsonDocument()
+                      .append("new_samples",
+                              new BsonDocument().append("$set",
+                                      new BsonArray(Arrays.asList(
+                                              new 
BsonBinary(Bytes.toBytes("Sample02")),
+                                              new 
BsonBinary(Bytes.toBytes("Sample03"))
+                                      )))))
+              .append("$SET", new BsonDocument()
+                      .append("newrecord", ((BsonArray) 
(document1.get("track"))).get(0)))
+              .append("$UNSET", new BsonDocument()
+                      .append("rather[3].outline.halfway.so[2][2]", new 
BsonNull()));
+
+      conditionExpression =
+              "field_not_exists(newrecord) AND 
field_exists(rather[3].outline.halfway.so[2][2])";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument());
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+
+      stmt.setString(1, "pk1010");
+
+      // Conditional Upsert successful
+      assertReturnedRowResult(stmt, conn, tableName, 
"json/sample_updated_02.json", true);
+
+      updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("result[1].location.state", new 
BsonString("AK")))
+              .append("$UNSET", new BsonDocument()
+                      .append("result[4].emails[1]", new BsonNull()));
+
+      conditionExpression =
+              "result[2].location.coordinates.latitude > :latitude OR "
+                      + "(field_exists(result[1].location) AND 
result[1].location.state != :state" +
+                      " AND field_exists(result[4].emails[1]))";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":latitude", new BsonDouble(0))
+              .append(":state", new BsonString("AK")));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+
+      stmt.setString(1, "pk1011");
+
+      // Conditional Upsert successful
+      assertReturnedRowResult(stmt, conn, tableName, 
"json/sample_updated_03.json", true);
+
+      conditionExpression =
+              "press = :press AND track[0].shot[2][0].city.standard[5] = 
:softly";
+
+      conditionDoc = new BsonDocument();
+      conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+      conditionDoc.put("$VAL", new BsonDocument()
+              .append(":press", new BsonString("incorrect_value"))
+              .append(":softly", new BsonString("incorrect_value")));
+
+      updateExp = new BsonDocument()
+              .append("$SET", new BsonDocument()
+                      .append("new_field1",
+                              new 
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+                      .append("track[0].shot[2][0].city.standard[5]", new 
BsonString(
+                              "soft_new_val"))
+                      .append("track[0].shot[2][0].city.problem[2]",
+                              new 
BsonString("track[0].shot[2][0].city.problem[2] + 123")));
+
+      stmt = conn.prepareStatement("UPSERT INTO " + tableName
+              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + 
"')"
+              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE 
COL END");
+      stmt.setString(1, "pk0001");
+
+      // Conditional Upsert not successful
+      assertReturnedRowResult(stmt, conn, tableName, 
"json/sample_updated_01.json", false);
+
+      verifyRows(tableName, conn);
+    }
+  }
+
+  private static void upsertRows(Connection conn, String tableName, 
BsonDocument bsonDocument1,
+                                BsonDocument bsonDocument2, BsonDocument 
bsonDocument3)
+          throws SQLException {
+    PreparedStatement stmt =
+            conn.prepareStatement("UPSERT INTO " + tableName + " VALUES 
(?,?,?)");
+    stmt.setString(1, "pk0001");
+    stmt.setString(2, "0002");
+    stmt.setObject(3, bsonDocument1);
+    stmt.executeUpdate();
+
+    stmt.setString(1, "pk1010");
+    stmt.setString(2, "1010");
+    stmt.setObject(3, bsonDocument2);
+    stmt.executeUpdate();
+
+    stmt.setString(1, "pk1011");
+    stmt.setString(2, "1011");
+    stmt.setObject(3, bsonDocument3);
+    stmt.executeUpdate();
+  }
+
+  private static void verifyRows(String tableName, Connection conn)
+          throws SQLException, IOException {
+    String query;
+    ResultSet rs;
+    BsonDocument document1;
+    query = "SELECT * FROM " + tableName;
+    rs = conn.createStatement().executeQuery(query);
+
+    assertTrue(rs.next());
+    assertEquals("pk0001", rs.getString(1));
+    assertEquals("0003", rs.getString(2));
+    document1 = (BsonDocument) rs.getObject(3);
+
+    String updatedJson = getJsonString("json/sample_updated_01.json");
+    assertEquals(RawBsonDocument.parse(updatedJson), document1);
+
+    assertTrue(rs.next());
+    assertEquals("pk1010", rs.getString(1));
+    assertEquals("1010", rs.getString(2));
+    BsonDocument document2 = (BsonDocument) rs.getObject(3);
+
+    updatedJson = getJsonString("json/sample_updated_02.json");
+    assertEquals(RawBsonDocument.parse(updatedJson), document2);
+
+    assertTrue(rs.next());
+    assertEquals("pk1011", rs.getString(1));
+    assertEquals("1011", rs.getString(2));
+    BsonDocument document3 = (BsonDocument) rs.getObject(3);
+
+    updatedJson = getJsonString("json/sample_updated_03.json");
+    assertEquals(RawBsonDocument.parse(updatedJson), document3);
+
+    assertFalse(rs.next());
+  }
+
+  private static void assertReturnedRowResult(PreparedStatement stmt,
+                                              Connection conn,
+                                              String tableName,
+                                              String jsonPath,
+                                              boolean success)
+          throws SQLException, IOException {
+    Pair<Integer, Tuple> resultPair =
+            
stmt.unwrap(PhoenixPreparedStatement.class).executeUpdateReturnRow();
+    assertEquals(success ? 1 : 0, resultPair.getFirst().intValue());
+    Tuple result = resultPair.getSecond();
+    PTable table = conn.unwrap(PhoenixConnection.class).getTable(tableName);
+
+    Cell cell = result.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+            table.getColumns().get(2).getColumnQualifierBytes());
+    assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)),
+            PBson.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(),
+                    cell.getValueLength()));
+  }
+
   private static void validateIndexUsed(PreparedStatement ps, String indexName)
       throws SQLException {
     ExplainPlan plan = 
ps.unwrap(PhoenixPreparedStatement.class).optimizeQuery().getExplainPlan();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
index f7d72b868e..e5e2c116e4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -21,8 +21,13 @@ import static 
org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
@@ -33,21 +38,37 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBson;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.bson.BsonDocument;
+import org.bson.RawBsonDocument;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -118,6 +139,177 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
         conn.close();
     }
 
+    @Test
+    public void testReturnRowResult() throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String sample1 = getJsonString("json/sample_01.json");
+        String sample2 = getJsonString("json/sample_02.json");
+        BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+        BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName
+                    + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR, 
COUNTER1 DOUBLE,"
+                    + " COUNTER2 VARCHAR,"
+                    + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY 
KEY(PK1, PK2, PK3))";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, 
COUNTER1, COL3, COL4)"
+                    + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON 
DUPLICATE KEY " +
+                    "IGNORE";
+            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
1011.202, null, true,
+                    bsonDocument1, bsonDocument1, 123);
+
+            upsertSql =
+                    "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
+                            + "VALUES('pk000', -123.98, 'pk003', 0) ON 
DUPLICATE"
+                            + " KEY IGNORE";
+            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
1011.202, null, false,
+                    null, bsonDocument1, 123);
+
+            upsertSql =
+                    "UPSERT INTO " + tableName
+                            + " (PK1, PK2, PK3, COUNTER1, COUNTER2) 
VALUES('pk000', -123.98, "
+                            + "'pk003', 234, 'col2_000')";
+            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 234d, 
"col2_000", true,
+                    null, bsonDocument1, 123);
+
+            upsertSql = "UPSERT INTO " + tableName
+                    + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
+                    + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
+                    + " END, "
+                    + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 
'col2_001' ELSE COUNTER2 "
+                    + "END, "
+                    + "COL3 = ?, "
+                    + "COL4 = 234";
+            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
2233.99, "col2_001", true,
+                    bsonDocument2, bsonDocument2, 234);
+
+            upsertSql = "UPSERT INTO " + tableName
+                    + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON 
DUPLICATE KEY UPDATE "
+                    + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 
1999.99 ELSE COUNTER1"
+                    + " END,"
+                    + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 
'col2_001' ELSE COUNTER2 "
+                    + "END";
+            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
2233.99, "col2_001", false
+                    , null, bsonDocument2, 234);
+        }
+    }
+
+    private static void validateReturnedRowAfterUpsert(Connection conn,
+                                                       String upsertSql,
+                                                       String tableName,
+                                                       Double col1,
+                                                       String col2,
+                                                       boolean success,
+                                                       BsonDocument inputDoc,
+                                                       BsonDocument 
expectedDoc,
+                                                       Integer col4)
+            throws SQLException {
+        final Pair<Integer, Tuple> resultPair;
+        if (inputDoc != null) {
+            PhoenixPreparedStatement ps =
+                    
conn.prepareStatement(upsertSql).unwrap(PhoenixPreparedStatement.class);
+            ps.setObject(1, inputDoc);
+            resultPair = ps.executeUpdateReturnRow();
+        } else {
+            resultPair = conn.createStatement().unwrap(PhoenixStatement.class)
+                    .executeUpdateReturnRow(upsertSql);
+        }
+        assertEquals(success ? 1 : 0, resultPair.getFirst().intValue());
+        ResultTuple result = (ResultTuple) resultPair.getSecond();
+        PTable table = 
conn.unwrap(PhoenixConnection.class).getTable(tableName);
+
+        Cell cell =
+                result.getResult()
+                        
.getColumnLatestCell(table.getColumns().get(3).getFamilyName().getBytes(),
+                                
table.getColumns().get(3).getColumnQualifierBytes());
+        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 1);
+        assertEquals("pk000",
+                PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
+
+        ptr = new ImmutableBytesPtr();
+        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 2);
+        assertEquals(-123.98,
+                PDouble.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
+
+        ptr = new ImmutableBytesPtr();
+        table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 3);
+        assertEquals("pk003",
+                PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(), 
ptr.getLength()));
+
+        assertEquals(col1,
+                PDouble.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(),
+                        cell.getValueLength()));
+        cell = result.getResult()
+                
.getColumnLatestCell(table.getColumns().get(4).getFamilyName().getBytes(),
+                        table.getColumns().get(4).getColumnQualifierBytes());
+        if (col2 != null) {
+            assertEquals(col2,
+                    PVarchar.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(),
+                            cell.getValueLength()));
+        } else {
+            assertNull(cell);
+        }
+
+        cell = result.getResult()
+                
.getColumnLatestCell(table.getColumns().get(5).getFamilyName().getBytes(),
+                        table.getColumns().get(5).getColumnQualifierBytes());
+        if (expectedDoc != null) {
+            assertEquals(expectedDoc,
+                    PBson.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(),
+                            cell.getValueLength()));
+        } else {
+            assertNull(cell);
+        }
+
+        cell = result.getResult()
+                
.getColumnLatestCell(table.getColumns().get(6).getFamilyName().getBytes(),
+                        table.getColumns().get(6).getColumnQualifierBytes());
+        if (col4 != null) {
+            assertEquals(col4,
+                    PInteger.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(),
+                            cell.getValueLength()));
+        } else {
+            assertNull(cell);
+        }
+    }
+
+    @Test
+    public void testReturnRowResultWithoutAutoCommit() throws Exception {
+        Assume.assumeTrue("Set correct result to RegionActionResult on hbase 
versions " +
+                "2.4.18+, 2.5.9+, and 2.6.0+", 
isSetCorrectResultEnabledOnHBase());
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String sample1 = getJsonString("json/sample_01.json");
+        BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE " + tableName
+                    + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR, 
COUNTER1 DOUBLE,"
+                    + " COUNTER2 VARCHAR,"
+                    + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY 
KEY(PK1, PK2, PK3))";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, 
COUNTER1, COL3, COL4)"
+                    + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON 
DUPLICATE KEY " +
+                    "IGNORE";
+            validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 
1011.202, null, true,
+                    bsonDocument1, bsonDocument1, 123);
+            throw new RuntimeException("Should not reach here");
+        } catch (SQLException e) {
+            
Assert.assertEquals(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED.getErrorCode(),
+                    e.getErrorCode());
+        }
+    }
+
     @Test
     public void testColumnsTimestampUpdateWithAllCombinations() throws 
Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -643,4 +835,9 @@ public class OnDuplicateKey2IT extends 
ParallelStatsDisabledIT {
         }
         return patchVersion >= 9;
     }
+
+    private static String getJsonString(String jsonFilePath) throws 
IOException {
+        URL fileUrl = 
OnDuplicateKey2IT.class.getClassLoader().getResource(jsonFilePath);
+        return FileUtils.readFileToString(new File(fileUrl.getFile()), 
Charset.defaultCharset());
+    }
 }

Reply via email to