This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 205acd4562 PHOENIX-7398 New PhoenixStatement API to return row for
Atomic/Conditional Upserts (#1967)
205acd4562 is described below
commit 205acd4562cb6908edd5ab446f13e33bcae2ba13
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Sep 10 17:13:46 2024 -0700
PHOENIX-7398 New PhoenixStatement API to return row for Atomic/Conditional
Upserts (#1967)
---
.../apache/phoenix/exception/SQLExceptionCode.java | 2 +
.../org/apache/phoenix/execute/MutationState.java | 34 +++
.../phoenix/index/PhoenixIndexBuilderHelper.java | 4 +
.../phoenix/jdbc/PhoenixPreparedStatement.java | 30 ++-
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 69 ++++-
.../phoenix/hbase/index/IndexRegionObserver.java | 215 ++++++++++++----
.../hbase/index/builder/BaseIndexBuilder.java | 5 +
.../hbase/index/builder/IndexBuildManager.java | 5 +
.../phoenix/hbase/index/builder/IndexBuilder.java | 8 +
.../apache/phoenix/index/PhoenixIndexBuilder.java | 6 +-
.../iterate/NonAggregateRegionScannerFactory.java | 35 ++-
.../java/org/apache/phoenix/end2end/Bson4IT.java | 285 +++++++++++++++++++--
.../apache/phoenix/end2end/OnDuplicateKey2IT.java | 197 ++++++++++++++
13 files changed, 806 insertions(+), 89 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index c060d4d20f..ee83869611 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -632,6 +632,8 @@ public enum SQLExceptionCode {
STALE_METADATA_CACHE_EXCEPTION(915, "43M26", "Stale metadata cache
exception",
info -> new StaleMetadataCacheException(info.getMessage())),
+ AUTO_COMMIT_NOT_ENABLED(916, "43M27", "Connection does not have
auto-commit enabled"),
+
//SQLCode for testing exceptions
FAILED_KNOWINGLY_FOR_TEST(7777, "TEST", "Exception was thrown to test
something");
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 529b6cd60e..e5a0c3ede2 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -187,6 +187,15 @@ public class MutationState implements SQLCloseable {
private final boolean indexRegionObserverEnabledAllTables;
+ /**
+ * Return result back to client. To be used when client needs to read the
whole row
+ * or some specific attributes of the row back. As of PHOENIX-7398,
returning whole
+ * row is supported. This is used to allow client to set Mutation
attribute that is read
+ * by server for it to atomically read the row and return it back.
+ */
+ private ReturnResult returnResult;
+ private Result result;
+
public static void resetAllMutationState(){
allDeletesMutations = true;
allUpsertsMutations = true;
@@ -482,6 +491,10 @@ public class MutationState implements SQLCloseable {
return numRows;
}
+ public Result getResult() {
+ return this.result;
+ }
+
private MultiRowMutationState getLastMutationBatch(Map<TableRef,
List<MultiRowMutationState>> mutations, TableRef tableRef) {
List<MultiRowMutationState> mutationBatches = mutations.get(tableRef);
if (mutationBatches == null || mutationBatches.isEmpty()) {
@@ -847,6 +860,12 @@ public class MutationState implements SQLCloseable {
if (onDupKeyBytes != null) {
mutation.setAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB,
onDupKeyBytes);
}
+ if (this.returnResult != null) {
+ if (this.returnResult == ReturnResult.ROW) {
+
mutation.setAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT,
+
PhoenixIndexBuilderHelper.RETURN_RESULT_ROW);
+ }
+ }
}
rowMutationsPertainingToIndex = rowMutations;
}
@@ -1127,6 +1146,10 @@ public class MutationState implements SQLCloseable {
return batchCount;
}
+ public void setReturnResult(ReturnResult returnResult) {
+ this.returnResult = returnResult;
+ }
+
public static final class MutationBytes {
private long deleteMutationCounter;
@@ -1550,6 +1573,11 @@ public class MutationState implements SQLCloseable {
numUpdatedRowsForAutoCommit =
PInteger.INSTANCE.getCodec()
.decodeInt(cell.getValueArray(),
cell.getValueOffset(),
SortOrder.getDefault());
+ if (this.returnResult != null) {
+ if (this.returnResult == ReturnResult.ROW)
{
+ this.result = result;
+ }
+ }
} else {
numUpdatedRowsForAutoCommit = 1;
}
@@ -2005,6 +2033,7 @@ public class MutationState implements SQLCloseable {
estimatedSize = 0;
this.mutationsMap.clear();
phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
+ this.returnResult = null;
}
private void resetTransactionalState() {
@@ -2418,4 +2447,9 @@ public class MutationState implements SQLCloseable {
public boolean isEmpty() {
return mutationsMap != null ? mutationsMap.isEmpty() : true;
}
+
+ public enum ReturnResult {
+ ROW
+ }
+
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
index 81bda56408..6ef1d9e38a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilderHelper.java
@@ -38,6 +38,10 @@ public final class PhoenixIndexBuilderHelper {
private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; //
boolean true
private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT
+ Bytes.SIZEOF_BOOLEAN;
public static final String ATOMIC_OP_ATTRIB = "_ATOMIC_OP_ATTRIB";
+
+ public static final String RETURN_RESULT = "_RETURN_RESULT";
+ public static final byte[] RETURN_RESULT_ROW = new byte[]{0};
+
public static byte[] serializeOnDupKeyIgnore() {
return ON_DUP_KEY_IGNORE_BYTES;
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 9a00f9bbe5..036020df39 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -46,15 +46,18 @@ import java.util.Calendar;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.BindManager;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.SQLCloseable;
@@ -197,6 +200,11 @@ public class PhoenixPreparedStatement extends
PhoenixStatement implements Phoeni
@Override
public int executeUpdate() throws SQLException {
+ preExecuteUpdate();
+ return executeMutation(statement,
createAuditQueryLogger(statement,query));
+ }
+
+ private void preExecuteUpdate() throws SQLException {
throwIfUnboundParameters();
if (!statement.getOperation().isMutation()) {
throw new
ExecuteUpdateNotApplicableException(statement.getOperation());
@@ -205,7 +213,27 @@ public class PhoenixPreparedStatement extends
PhoenixStatement implements Phoeni
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
.build().buildException();
}
- return executeMutation(statement,
createAuditQueryLogger(statement,query));
+ }
+
+ /**
+ * Executes the given SQL statement similar to JDBC API executeUpdate()
but also returns the
+ * updated or non-updated row as Result object back to the client. This
must be used with
+ * auto-commit Connection. This makes the operation atomic.
+ * If the row is successfully updated, return the updated row, otherwise
if the row
+ * cannot be updated, return non-updated row.
+ *
+ * @return The pair of int and Tuple, where int represents value 1 for
successful row update
+ * and 0 for non-successful row update, and Tuple represents the state of
the row.
+ * @throws SQLException If the statement cannot be executed.
+ */
+ public Pair<Integer, Tuple> executeUpdateReturnRow() throws SQLException {
+ if (!connection.getAutoCommit()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+ .buildException();
+ }
+ preExecuteUpdate();
+ return executeMutation(statement, createAuditQueryLogger(statement,
query),
+ MutationState.ReturnResult.ROW);
}
public QueryPlan optimizeQuery() throws SQLException {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 92e2aa07ee..b9f895b777 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.call.CallRunner;
@@ -107,6 +108,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.StaleMetadataCacheException;
import org.apache.phoenix.exception.UpgradeRequiredException;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.ReturnResult;
import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
@@ -209,6 +211,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
@@ -576,11 +579,22 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
}
- protected int executeMutation(final CompilableStatement stmt, final
AuditQueryLogger queryLogger) throws SQLException {
- return executeMutation(stmt, true, queryLogger);
+ protected int executeMutation(final CompilableStatement stmt,
+ final AuditQueryLogger queryLogger) throws
SQLException {
+ return executeMutation(stmt, true, queryLogger, null).getFirst();
}
- private int executeMutation(final CompilableStatement stmt, final boolean
doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger) throws
SQLException {
+ Pair<Integer, Tuple> executeMutation(final CompilableStatement stmt,
+ final AuditQueryLogger queryLogger,
+ final ReturnResult returnResult) throws
SQLException {
+ return executeMutation(stmt, true, queryLogger, returnResult);
+ }
+
+ private Pair<Integer, Tuple> executeMutation(final CompilableStatement
stmt,
+ final boolean
doRetryOnMetaNotFoundError,
+ final AuditQueryLogger
queryLogger,
+ final ReturnResult
returnResult)
+ throws SQLException {
if (connection.isReadOnly()) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.READ_ONLY_CONNECTION).
@@ -590,9 +604,9 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
try {
return CallRunner
.run(
- new CallRunner.CallableThrowable<Integer,
SQLException>() {
+ new CallRunner.CallableThrowable<Pair<Integer,
Tuple>, SQLException>() {
@Override
- public Integer call() throws SQLException {
+ public Pair<Integer, Tuple> call() throws
SQLException {
boolean success = false;
String tableName = null;
boolean isUpsert = false;
@@ -631,12 +645,15 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
// just max out at Integer.MAX_VALUE
int lastUpdateCount = (int)
Math.min(Integer.MAX_VALUE,
lastState.getUpdateCount());
+ Result result = null;
if (connection.getAutoCommit()) {
+ state.setReturnResult(returnResult);
connection.commit();
if (isAtomicUpsert) {
lastUpdateCount =
connection.getMutationState()
.getNumUpdatedRowsForAutoCommit();
}
+ result =
connection.getMutationState().getResult();
}
setLastQueryPlan(null);
setLastUpdateCount(lastUpdateCount);
@@ -651,7 +668,7 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
}
success = true;
- return lastUpdateCount;
+ return new Pair<>(lastUpdateCount, new
ResultTuple(result));
}
//Force update cache and retry if meta not found
error occurs
catch (MetaDataEntityNotFoundException e) {
@@ -661,7 +678,8 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
}
if (new
MetaDataClient(connection).updateCache(connection.getTenantId(),
e.getSchemaName(), e.getTableName(),
true).wasUpdated()) {
- return executeMutation(stmt, false,
queryLogger);
+ return executeMutation(stmt, false,
queryLogger,
+ returnResult);
}
}
throw e;
@@ -2376,17 +2394,46 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
@Override
public int executeUpdate(String sql) throws SQLException {
+ CompilableStatement stmt = preExecuteUpdate(sql);
+ int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt,
sql));
+ flushIfNecessary();
+ return updateCount;
+ }
+
+ /**
+ * Executes the given SQL statement similar to JDBC API executeUpdate()
but also returns the
+ * updated or non-updated row as Result object back to the client. This
must be used with
+ * auto-commit Connection. This makes the operation atomic.
+ * If the row is successfully updated, return the updated row, otherwise
if the row
+ * cannot be updated, return non-updated row.
+ *
+ * @param sql The SQL DML statement, UPSERT or DELETE for Phoenix.
+ * @return The pair of int and Tuple, where int represents value 1 for
successful row
+ * update and 0 for non-successful row update, and Tuple represents the
state of the row.
+ * @throws SQLException If the statement cannot be executed.
+ */
+ public Pair<Integer, Tuple> executeUpdateReturnRow(String sql) throws
SQLException {
+ if (!connection.getAutoCommit()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED).build()
+ .buildException();
+ }
+ CompilableStatement stmt = preExecuteUpdate(sql);
+ Pair<Integer, Tuple> result =
+ executeMutation(stmt, createAuditQueryLogger(stmt, sql),
ReturnResult.ROW);
+ flushIfNecessary();
+ return result;
+ }
+
+ private CompilableStatement preExecuteUpdate(String sql) throws
SQLException {
CompilableStatement stmt = parseStatement(sql);
if (!stmt.getOperation().isMutation) {
throw new ExecuteUpdateNotApplicableException(sql);
}
if (!batch.isEmpty()) {
throw new
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
- .build().buildException();
+ .build().buildException();
}
- int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt,
sql));
- flushIfNecessary();
- return updateCount;
+ return stmt;
}
private void flushIfNecessary() throws SQLException {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b2e3b5ff9d..8be7419780 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -141,6 +141,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
import static
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
import static
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
+import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.RETURN_RESULT;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
/**
@@ -257,6 +258,12 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// The latches of the threads waiting for this batch to complete
private List<CountDownLatch> waitList = null;
private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
+ // store current cells into a map where the key is ColumnReference of
the column family and
+ // column qualifier, and value is a pair of cell and a boolean. The
value of the boolean
+ // will be true if the expression is CaseExpression and Else-clause is
evaluated to be
+ // true, will be null if there is no expression on this column,
otherwise false
+ // This is only initialized for single row atomic mutation.
+ private Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap;
//list containing the original mutations from the
MiniBatchOperationInProgress. Contains
// any annotations we were sent by the client, and can be used in hooks
that don't get
@@ -268,6 +275,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private boolean hasGlobalIndex;
private boolean hasLocalIndex;
private boolean hasTransform;
+ private boolean returnResult;
+
public BatchMutateContext() {
this.clientVersion = 0;
}
@@ -516,23 +525,14 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
"Somehow didn't return an index update but also didn't propagate the
failure to the client!");
}
- private void ignoreAtomicOperations (MiniBatchOperationInProgress<Mutation>
miniBatchOp) {
- for (int i = 0; i < miniBatchOp.size(); i++) {
- Mutation m = miniBatchOp.getOperation(i);
- if (this.builder.isAtomicOp(m)) {
- miniBatchOp.setOperationStatus(i, IGNORE);
- }
- }
- }
-
private void populateRowsToLock(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
BatchMutateContext context) {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
- if (this.builder.isAtomicOp(m) || this.builder.isEnabled(m)) {
+ if (this.builder.isAtomicOp(m) || this.builder.returnResult(m) ||
+ this.builder.isEnabled(m)) {
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
context.rowsToLock.add(row);
-
}
}
}
@@ -572,8 +572,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
BatchMutateContext context) throws
IOException {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
- if (this.builder.isAtomicOp(m) && m instanceof Put) {
- List<Mutation> mutations = generateOnDupMutations(context,
(Put)m);
+ if ((this.builder.isAtomicOp(m) || this.builder.returnResult(m)) &&
m instanceof Put) {
+ List<Mutation> mutations = generateOnDupMutations(context,
(Put)m, miniBatchOp);
if (!mutations.isEmpty()) {
addOnDupMutationsToBatch(miniBatchOp, i, mutations);
} else {
@@ -583,11 +583,19 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// them the new value is the same as the old value in the
ELSE-clause (empty
// cell timestamp will NOT be updated)
byte[] retVal = PInteger.INSTANCE.toBytes(0);
- Cell cell = PhoenixKeyValueUtil.newKeyValue(m.getRow(),
Bytes.toBytes(UPSERT_CF),
- Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0,
retVal.length);
+ List<Cell> cells = new ArrayList<>();
+ cells.add(PhoenixKeyValueUtil.newKeyValue(m.getRow(),
Bytes.toBytes(UPSERT_CF),
+ Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0,
retVal.length));
+
+ if (context.returnResult) {
+ context.currColumnCellExprMap.forEach(
+ (key, value) -> cells.add(value.getFirst()));
+ cells.sort(CellComparator.getInstance());
+ }
+
// put Result in OperationStatus for returning update status
from conditional
// upserts, where 0 represents the row is not updated
- Result result = Result.create(new
ArrayList<>(Arrays.asList(cell)));
+ Result result = Result.create(cells);
miniBatchOp.setOperationStatus(i,
new OperationStatus(SUCCESS, result));
}
@@ -647,7 +655,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// skip this mutation if we aren't enabling indexing or not an
atomic op
// or if it is an atomic op and its timestamp is already set(not
LATEST)
if (!builder.isEnabled(m) &&
- !(builder.isAtomicOp(m) && IndexUtil.getMaxTimestamp(m) ==
HConstants.LATEST_TIMESTAMP)) {
+ !((builder.isAtomicOp(m) || builder.returnResult(m)) &&
+ IndexUtil.getMaxTimestamp(m) ==
HConstants.LATEST_TIMESTAMP)) {
continue;
}
setTimestampOnMutation(m, ts);
@@ -1146,14 +1155,18 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
BatchMutateContext context) {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
- if (this.builder.isAtomicOp(m)) {
+ if (this.builder.returnResult(m) && miniBatchOp.size() == 1) {
+ context.returnResult = true;
+ }
+ if (this.builder.isAtomicOp(m) || this.builder.returnResult(m)) {
context.hasAtomic = true;
if (context.hasDelete) {
return;
}
- } else if (m instanceof Delete)
+ } else if (m instanceof Delete) {
context.hasDelete = true;
- if (context.hasAtomic) {
+ }
+ if (context.hasAtomic || context.returnResult) {
return;
}
}
@@ -1284,20 +1297,21 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
lockRows(context);
long onDupCheckTime = 0;
- if (context.hasAtomic || context.hasGlobalIndex ||
context.hasUncoveredIndex || context.hasTransform) {
+ if (context.hasAtomic || context.returnResult ||
context.hasGlobalIndex ||
+ context.hasUncoveredIndex || context.hasTransform) {
// Retrieve the current row states from the data table while
holding the lock.
// This is needed for both atomic mutations and global indexes
long start = EnvironmentEdgeManager.currentTimeMillis();
context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put,
Put>>(context.rowsToLock.size());
if (context.hasGlobalIndex || context.hasTransform ||
context.hasAtomic ||
- context.hasDelete || (context.hasUncoveredIndex &&
+ context.returnResult || context.hasDelete ||
(context.hasUncoveredIndex &&
isPartialUncoveredIndexMutation(indexMetaData,
miniBatchOp))) {
getCurrentRowStates(c, context);
}
onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() -
start);
}
- if (context.hasAtomic) {
+ if (context.hasAtomic || context.returnResult) {
long start = EnvironmentEdgeManager.currentTimeMillis();
// add the mutations for conditional updates to the mini batch
addOnDupMutationsToBatch(miniBatchOp, context);
@@ -1359,7 +1373,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
- if (!this.builder.isAtomicOp(m)) {
+ if (!this.builder.isAtomicOp(m) && !this.builder.returnResult(m)) {
continue;
}
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
@@ -1440,13 +1454,18 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
try {
if (success) {
context.currentPhase = BatchMutatePhase.POST;
- if(context.hasAtomic && miniBatchOp.size() == 1) {
+ if ((context.hasAtomic || context.returnResult) &&
miniBatchOp.size() == 1) {
if
(!isAtomicOperationComplete(miniBatchOp.getOperationStatus(0))) {
byte[] retVal = PInteger.INSTANCE.toBytes(1);
Cell cell = PhoenixKeyValueUtil.newKeyValue(
miniBatchOp.getOperation(0).getRow(),
Bytes.toBytes(UPSERT_CF),
Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0,
retVal.length);
- Result result = Result.create(new
ArrayList<>(Arrays.asList(cell)));
+ List<Cell> cells = new ArrayList<>();
+ cells.add(cell);
+
+ addCellsIfResultReturned(miniBatchOp, context, cells);
+
+ Result result = Result.create(cells);
miniBatchOp.setOperationStatus(0,
new OperationStatus(SUCCESS, result));
}
@@ -1470,6 +1489,62 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
+ /**
+ * If the result needs to be returned for the given update operation,
identify the updated row
+ * cells and add the input list of cells.
+ *
+ * @param miniBatchOp Batch of mutations getting applied to region.
+ * @param context The BatchMutateContext object shared during coproc hooks
execution as part of
+ * the batch mutate life cycle.
+ * @param cells The list of cells to be returned back to the client.
+ */
+ private static void
addCellsIfResultReturned(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context,
List<Cell> cells) {
+ if (context.returnResult) {
+ Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap =
+ context.currColumnCellExprMap;
+ Mutation mutation = miniBatchOp.getOperation(0);
+ updateColumnCellExprMap(mutation, currColumnCellExprMap);
+ Mutation[] mutations =
miniBatchOp.getOperationsFromCoprocessors(0);
+ if (mutations != null) {
+ for (Mutation m : mutations) {
+ updateColumnCellExprMap(m, currColumnCellExprMap);
+ }
+ }
+ for (Pair<Cell, Boolean> cellPair :
currColumnCellExprMap.values()) {
+ cells.add(cellPair.getFirst());
+ }
+ cells.sort(CellComparator.getInstance());
+ }
+ }
+
+ /**
+ * Update the contents of {@code currColumnCellExprMap} based on the
mutation that was
+ * successfully applied to the row.
+ *
+ * @param mutation The Mutation object which is applied to the row.
+ * @param currColumnCellExprMap The map of column to cell reference.
+ */
+ private static void updateColumnCellExprMap(Mutation mutation,
+ Map<ColumnReference,
Pair<Cell, Boolean>>
+ currColumnCellExprMap)
{
+ if (mutation != null) {
+ for (Map.Entry<byte[], List<Cell>> entry :
+ mutation.getFamilyCellMap().entrySet()) {
+ for (Cell entryCell : entry.getValue()) {
+ byte[] family = CellUtil.cloneFamily(entryCell);
+ byte[] qualifier = CellUtil.cloneQualifier(entryCell);
+ ColumnReference colRef = new ColumnReference(family,
qualifier);
+ if (mutation instanceof Put) {
+ currColumnCellExprMap.put(colRef, new
Pair<>(entryCell, null));
+ } else if (mutation instanceof Delete) {
+ currColumnCellExprMap.remove(colRef);
+ }
+ }
+ }
+ }
+ }
+
private void doPost(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context) throws IOException {
long start = EnvironmentEdgeManager.currentTimeMillis();
@@ -1586,12 +1661,21 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
* Otherwise, we will generate one Put mutation and optionally one Delete
mutation (with
* DeleteColumn type cells for all columns set to null).
*/
- private List<Mutation> generateOnDupMutations(BatchMutateContext context,
Put atomicPut) throws IOException {
+ private List<Mutation> generateOnDupMutations(BatchMutateContext context,
+ Put atomicPut,
+
MiniBatchOperationInProgress<Mutation> miniBatchOp)
+ throws IOException {
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
- if (opBytes == null) { // Unexpected
- return null;
- }
+ byte[] returnResult = atomicPut.getAttribute(RETURN_RESULT);
+ if ((opBytes == null && returnResult == null) ||
+ (opBytes == null && miniBatchOp.size() != 1)) {
+ // Unexpected
+ // Either mutation should be atomic by providing non-null ON
DUPLICATE KEY, or
+ // if the result needs to be returned, only single row must be
updated as part of
+ // the batch mutation.
+ return null;
+ }
Put put = null;
Delete delete = null;
@@ -1599,16 +1683,40 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// later these timestamps will be updated by the
IndexRegionObserver#setTimestamps() function
long ts = HConstants.LATEST_TIMESTAMP;
- byte[] rowKey = atomicPut.getRow();
+ // store current cells into a map where the key is ColumnReference of
the column family and
+ // column qualifier, and value is a pair of cell and a boolean. The
value of the boolean
+ // will be true if the expression is CaseExpression and Else-clause is
evaluated to be
+ // true, will be null if there is no expression on this column,
otherwise false
+ Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new
HashMap<>();
+
+ byte[] rowKey = atomicPut.getRow();
ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey);
// Get the latest data row state
Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
Put currentDataRowState = dataRowState != null ? dataRowState.getFirst()
: null;
+ // if result needs to be returned but the DML does not have ON
DUPLICATE KEY present,
+ // perform the mutation and return the result.
+ if (opBytes == null) {
+ mutations.add(atomicPut);
+ updateCurrColumnCellExpr(currentDataRowState != null ?
currentDataRowState : atomicPut,
+ currColumnCellExprMap);
+ if (context.returnResult) {
+ context.currColumnCellExprMap = currColumnCellExprMap;
+ }
+ return mutations;
+ }
+
if (PhoenixIndexBuilderHelper.isDupKeyIgnore(opBytes)) {
if (currentDataRowState == null) {
// new row
mutations.add(atomicPut);
+ updateCurrColumnCellExpr(atomicPut, currColumnCellExprMap);
+ } else {
+ updateCurrColumnCellExpr(currentDataRowState,
currColumnCellExprMap);
+ }
+ if (context.returnResult) {
+ context.currColumnCellExprMap = currColumnCellExprMap;
}
return mutations;
}
@@ -1631,19 +1739,17 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// read the column values requested in the get from the current data row
List<Cell> cells = IndexUtil.readColumnsFromRow(currentDataRowState,
colsReadInExpr);
- // store current cells into a map where the key is ColumnReference of
the column family and
- // column qualifier, and value is a pair of cell and a boolean. The
value of the boolean
- // will be true if the expression is CaseExpression and Else-clause is
evaluated to be
- // true, will be null if there is no expression on this column,
otherwise false
- Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new
HashMap<>();
-
if (currentDataRowState == null) { // row doesn't exist
+ updateCurrColumnCellExpr(atomicPut, currColumnCellExprMap);
if (skipFirstOp) {
if (operations.size() <= 1 && repeat <= 1) {
// early exit since there is only one ON DUPLICATE KEY UPDATE
// clause which is ignored because the row doesn't exist so
// simply use the values in UPSERT VALUES
mutations.add(atomicPut);
+ if (context.returnResult) {
+ context.currColumnCellExprMap = currColumnCellExprMap;
+ }
return mutations;
}
// If there are multiple ON DUPLICATE KEY UPDATE on a new row,
@@ -1656,17 +1762,13 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// Base current state off of existing row
flattenedCells = cells;
// store all current cells from currentDataRowState
- for (Map.Entry<byte[], List<Cell>> entry :
- currentDataRowState.getFamilyCellMap().entrySet()) {
- for (Cell cell : new ArrayList<>(entry.getValue())) {
- byte[] family = CellUtil.cloneFamily(cell);
- byte[] qualifier = CellUtil.cloneQualifier(cell);
- ColumnReference colRef = new ColumnReference(family,
qualifier);
- currColumnCellExprMap.put(colRef, new Pair<>(cell, null));
- }
- }
+ updateCurrColumnCellExpr(currentDataRowState, currColumnCellExprMap);
}
+ if (context.returnResult) {
+ context.currColumnCellExprMap = currColumnCellExprMap;
+ }
+
MultiKeyValueTuple tuple = new MultiKeyValueTuple(flattenedCells);
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -1764,6 +1866,27 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return mutations;
}
+ /**
+ * Create or Update ColumnRef to Cell map based on the Put mutation.
+ *
+ * @param put The Put mutation representing the current or new/updated
state of the row.
+ * @param currColumnCellExprMap ColumnRef to Cell mapping for all the
cells involved in the
+ * given mutation.
+ */
+ private static void updateCurrColumnCellExpr(Put put,
+ Map<ColumnReference,
Pair<Cell, Boolean>>
+
currColumnCellExprMap) {
+ for (Map.Entry<byte[], List<Cell>> entry :
+ put.getFamilyCellMap().entrySet()) {
+ for (Cell cell : entry.getValue()) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ ColumnReference colRef = new ColumnReference(family,
qualifier);
+ currColumnCellExprMap.put(colRef, new Pair<>(cell, null));
+ }
+ }
+ }
+
private void addEmptyKVCellToPut(Put put, MultiKeyValueTuple tuple, PTable
table) throws IOException {
byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
byte[] emptyCQ =
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index eec363fb5a..bbe94df7a1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -132,6 +132,11 @@ public abstract class BaseIndexBuilder implements
IndexBuilder {
return null;
}
+ @Override
+ public boolean returnResult(Mutation m) {
+ return false;
+ }
+
public RegionCoprocessorEnvironment getEnv() {
return this.env;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 72fcf37b0f..68076a4d79 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -180,4 +180,9 @@ public class IndexBuildManager implements Stoppable {
public ReplayWrite getReplayWrite(Mutation m) throws IOException {
return this.delegate.getReplayWrite(m);
}
+
+ public boolean returnResult(Mutation m) {
+ return delegate.returnResult(m);
+ }
+
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 8846650b3f..23a642ed69 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -150,4 +150,12 @@ public interface IndexBuilder extends Stoppable {
public List<Mutation> executeAtomicOp(Increment inc) throws IOException;
public ReplayWrite getReplayWrite(Mutation m);
+
+ /**
+ * True if mutation needs to return result.
+ *
+ * @param m Mutation object.
+ * @return True if mutation needs to return result, False otherwise.
+ */
+ boolean returnResult(Mutation m);
}
\ No newline at end of file
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 6720c6b5a2..97c8f7bb7d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -274,5 +274,9 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
public ReplayWrite getReplayWrite(Mutation m) {
return PhoenixIndexMetaData.getReplayWrite(m.getAttributesMap());
}
-
+
+ @Override
+ public boolean returnResult(Mutation m) {
+ return m.getAttribute(PhoenixIndexBuilderHelper.RETURN_RESULT) != null;
+ }
}
\ No newline at end of file
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 72b7c7e5d5..9ec70a8fd8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -226,20 +226,12 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
if (serverParsedArrayFuncRefs != null) {
Collections.addAll(resultList, serverParsedArrayFuncRefs);
}
- Expression[] serverParsedJsonValueFuncRefs = null;
- if
(scan.getAttribute(BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION) !=
null) {
- serverParsedJsonValueFuncRefs =
-
deserializeServerParsedPositionalExpressionInfoFromScan(scan,
-
BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION, serverParsedKVRefs);
- }
- if
(scan.getAttribute(BaseScannerRegionObserverConstants.BSON_VALUE_FUNCTION) !=
null) {
- serverParsedJsonValueFuncRefs =
- deserializeServerParsedPositionalExpressionInfoFromScan(scan,
- BaseScannerRegionObserverConstants.BSON_VALUE_FUNCTION,
serverParsedKVRefs);
- }
- if (serverParsedJsonValueFuncRefs != null) {
- Collections.addAll(resultList, serverParsedJsonValueFuncRefs);
- }
+ deserializeAndAddComplexDataTypeFunctions(scan,
+ BaseScannerRegionObserverConstants.JSON_VALUE_FUNCTION,
serverParsedKVRefs,
+ resultList);
+ deserializeAndAddComplexDataTypeFunctions(scan,
+ BaseScannerRegionObserverConstants.BSON_VALUE_FUNCTION,
serverParsedKVRefs,
+ resultList);
Expression[] serverParsedJsonQueryFuncRefs = null;
if
(scan.getAttribute(BaseScannerRegionObserverConstants.JSON_QUERY_FUNCTION) !=
null) {
serverParsedJsonQueryFuncRefs =
@@ -252,6 +244,21 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
return resultList;
}
+ private void deserializeAndAddComplexDataTypeFunctions(Scan scan,
+ String functionName,
+
Set<KeyValueColumnExpression>
+
serverParsedKVRefs,
+ List<Expression>
resultList) {
+ if (scan.getAttribute(functionName) != null) {
+ Expression[] serverParsedJsonValueFuncRefs =
+
deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+ functionName, serverParsedKVRefs);
+ if (serverParsedJsonValueFuncRefs != null) {
+ Collections.addAll(resultList, serverParsedJsonValueFuncRefs);
+ }
+ }
+ }
+
@VisibleForTesting
static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner
s,
boolean spoolingEnabled,
long thresholdBytes) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
index dfa8fa3c4d..853fd0ffa8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java
@@ -31,9 +31,18 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBson;
+import org.apache.phoenix.schema.types.PDouble;
import org.bson.BsonArray;
import org.bson.BsonBinary;
import org.bson.BsonDocument;
+import org.bson.BsonDouble;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
@@ -134,22 +143,8 @@ public class Bson4IT extends ParallelStatsDisabledIT {
BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
- PreparedStatement stmt =
- conn.prepareStatement("UPSERT INTO " + tableName + " VALUES
(?,?,?)");
- stmt.setString(1, "pk0001");
- stmt.setString(2, "0002");
- stmt.setObject(3, bsonDocument1);
- stmt.executeUpdate();
-
- stmt.setString(1, "pk1010");
- stmt.setString(2, "1010");
- stmt.setObject(3, bsonDocument2);
- stmt.executeUpdate();
-
- stmt.setString(1, "pk1011");
- stmt.setString(2, "1011");
- stmt.setObject(3, bsonDocument3);
- stmt.executeUpdate();
+ upsertRows(conn, tableName, bsonDocument1, bsonDocument2, bsonDocument3);
+ PreparedStatement stmt;
conn.commit();
@@ -254,6 +249,264 @@ public class Bson4IT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testConditionalUpsertReturnRow() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String ddl = "CREATE TABLE " + tableName
+ + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
+ + " CONSTRAINT pk PRIMARY KEY(PK1))";
+ conn.createStatement().execute(ddl);
+
+ String sample1 = getJsonString("json/sample_01.json");
+ String sample2 = getJsonString("json/sample_02.json");
+ String sample3 = getJsonString("json/sample_03.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+ BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
+
+ upsertRows(conn, tableName, bsonDocument1, bsonDocument2, bsonDocument3);
+ PreparedStatement stmt;
+
+ String conditionExpression =
+ "press = :press AND track[0].shot[2][0].city.standard[50] =
:softly";
+
+ BsonDocument conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":press", new BsonString("beat"))
+ .append(":softly", new BsonString("softly")));
+
+ String query = "SELECT * FROM " + tableName +
+ " WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT
BSON_CONDITION_EXPRESSION(COL, '"
+ + conditionDoc.toJson() + "')";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+
+ assertTrue(rs.next());
+ assertEquals("pk0001", rs.getString(1));
+ assertEquals("0002", rs.getString(2));
+ BsonDocument document1 = (BsonDocument) rs.getObject(3);
+ assertEquals(bsonDocument1, document1);
+
+ assertFalse(rs.next());
+
+ conditionExpression =
+ "press = :press AND track[0].shot[2][0].city.standard[5] =
:softly";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":press", new BsonString("beat"))
+ .append(":softly", new BsonString("softly")));
+
+ query = "SELECT * FROM " + tableName +
+ " WHERE PK1 = 'pk0001' AND C1 = '0002' AND
BSON_CONDITION_EXPRESSION(COL, '"
+ + conditionDoc.toJson() + "')";
+ rs = conn.createStatement().executeQuery(query);
+
+ assertTrue(rs.next());
+ assertEquals("pk0001", rs.getString(1));
+ assertEquals("0002", rs.getString(2));
+ document1 = (BsonDocument) rs.getObject(3);
+ assertEquals(bsonDocument1, document1);
+
+ assertFalse(rs.next());
+
+ BsonDocument updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("browserling",
+ new
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+ .append("track[0].shot[2][0].city.standard[5]", new
BsonString("soft"))
+ .append("track[0].shot[2][0].city.problem[2]",
+ new
BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
+ .append("$UNSET", new BsonDocument()
+ .append("track[0].shot[2][0].city.flame", new
BsonNull()));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END,"
+ + " C1 = ?");
+ stmt.setString(1, "pk0001");
+ stmt.setString(2, "0003");
+
+ // Conditional Upsert successful
+ assertReturnedRowResult(stmt, conn, tableName,
"json/sample_updated_01.json", true);
+
+ updateExp = new BsonDocument()
+ .append("$ADD", new BsonDocument()
+ .append("new_samples",
+ new BsonDocument().append("$set",
+ new BsonArray(Arrays.asList(
+ new
BsonBinary(Bytes.toBytes("Sample10")),
+ new
BsonBinary(Bytes.toBytes("Sample12")),
+ new
BsonBinary(Bytes.toBytes("Sample13")),
+ new
BsonBinary(Bytes.toBytes("Sample14"))
+ )))))
+ .append("$DELETE_FROM_SET", new BsonDocument()
+ .append("new_samples",
+ new BsonDocument().append("$set",
+ new BsonArray(Arrays.asList(
+ new
BsonBinary(Bytes.toBytes("Sample02")),
+ new
BsonBinary(Bytes.toBytes("Sample03"))
+ )))))
+ .append("$SET", new BsonDocument()
+ .append("newrecord", ((BsonArray)
(document1.get("track"))).get(0)))
+ .append("$UNSET", new BsonDocument()
+ .append("rather[3].outline.halfway.so[2][2]", new
BsonNull()));
+
+ conditionExpression =
+ "field_not_exists(newrecord) AND
field_exists(rather[3].outline.halfway.so[2][2])";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument());
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+
+ stmt.setString(1, "pk1010");
+
+ // Conditional Upsert successful
+ assertReturnedRowResult(stmt, conn, tableName,
"json/sample_updated_02.json", true);
+
+ updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("result[1].location.state", new
BsonString("AK")))
+ .append("$UNSET", new BsonDocument()
+ .append("result[4].emails[1]", new BsonNull()));
+
+ conditionExpression =
+ "result[2].location.coordinates.latitude > :latitude OR "
+ + "(field_exists(result[1].location) AND
result[1].location.state != :state" +
+ " AND field_exists(result[4].emails[1]))";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":latitude", new BsonDouble(0))
+ .append(":state", new BsonString("AK")));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+
+ stmt.setString(1, "pk1011");
+
+ // Conditional Upsert successful
+ assertReturnedRowResult(stmt, conn, tableName,
"json/sample_updated_03.json", true);
+
+ conditionExpression =
+ "press = :press AND track[0].shot[2][0].city.standard[5] =
:softly";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append(":press", new BsonString("incorrect_value"))
+ .append(":softly", new BsonString("incorrect_value")));
+
+ updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("new_field1",
+ new
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+ .append("track[0].shot[2][0].city.standard[5]", new
BsonString(
+ "soft_new_val"))
+ .append("track[0].shot[2][0].city.problem[2]",
+ new
BsonString("track[0].shot[2][0].city.problem[2] + 123")));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+ stmt.setString(1, "pk0001");
+
+ // Conditional Upsert not successful
+ assertReturnedRowResult(stmt, conn, tableName,
"json/sample_updated_01.json", false);
+
+ verifyRows(tableName, conn);
+ }
+ }
+
+ private static void upsertRows(Connection conn, String tableName,
BsonDocument bsonDocument1,
+ BsonDocument bsonDocument2, BsonDocument
bsonDocument3)
+ throws SQLException {
+ PreparedStatement stmt =
+ conn.prepareStatement("UPSERT INTO " + tableName + " VALUES
(?,?,?)");
+ stmt.setString(1, "pk0001");
+ stmt.setString(2, "0002");
+ stmt.setObject(3, bsonDocument1);
+ stmt.executeUpdate();
+
+ stmt.setString(1, "pk1010");
+ stmt.setString(2, "1010");
+ stmt.setObject(3, bsonDocument2);
+ stmt.executeUpdate();
+
+ stmt.setString(1, "pk1011");
+ stmt.setString(2, "1011");
+ stmt.setObject(3, bsonDocument3);
+ stmt.executeUpdate();
+ }
+
+ private static void verifyRows(String tableName, Connection conn)
+ throws SQLException, IOException {
+ String query;
+ ResultSet rs;
+ BsonDocument document1;
+ query = "SELECT * FROM " + tableName;
+ rs = conn.createStatement().executeQuery(query);
+
+ assertTrue(rs.next());
+ assertEquals("pk0001", rs.getString(1));
+ assertEquals("0003", rs.getString(2));
+ document1 = (BsonDocument) rs.getObject(3);
+
+ String updatedJson = getJsonString("json/sample_updated_01.json");
+ assertEquals(RawBsonDocument.parse(updatedJson), document1);
+
+ assertTrue(rs.next());
+ assertEquals("pk1010", rs.getString(1));
+ assertEquals("1010", rs.getString(2));
+ BsonDocument document2 = (BsonDocument) rs.getObject(3);
+
+ updatedJson = getJsonString("json/sample_updated_02.json");
+ assertEquals(RawBsonDocument.parse(updatedJson), document2);
+
+ assertTrue(rs.next());
+ assertEquals("pk1011", rs.getString(1));
+ assertEquals("1011", rs.getString(2));
+ BsonDocument document3 = (BsonDocument) rs.getObject(3);
+
+ updatedJson = getJsonString("json/sample_updated_03.json");
+ assertEquals(RawBsonDocument.parse(updatedJson), document3);
+
+ assertFalse(rs.next());
+ }
+
+ private static void assertReturnedRowResult(PreparedStatement stmt,
+ Connection conn,
+ String tableName,
+ String jsonPath,
+ boolean success)
+ throws SQLException, IOException {
+ Pair<Integer, Tuple> resultPair =
+
stmt.unwrap(PhoenixPreparedStatement.class).executeUpdateReturnRow();
+ assertEquals(success ? 1 : 0, resultPair.getFirst().intValue());
+ Tuple result = resultPair.getSecond();
+ PTable table = conn.unwrap(PhoenixConnection.class).getTable(tableName);
+
+ Cell cell = result.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ table.getColumns().get(2).getColumnQualifierBytes());
+ assertEquals(RawBsonDocument.parse(getJsonString(jsonPath)),
+ PBson.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(),
+ cell.getValueLength()));
+ }
+
private static void validateIndexUsed(PreparedStatement ps, String indexName)
throws SQLException {
ExplainPlan plan =
ps.unwrap(PhoenixPreparedStatement.class).optimizeQuery().getExplainPlan();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
index f7d72b868e..e5e2c116e4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -21,8 +21,13 @@ import static
org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@@ -33,21 +38,37 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBson;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.bson.BsonDocument;
+import org.bson.RawBsonDocument;
+import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -118,6 +139,177 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
conn.close();
}
+ @Test
+ public void testReturnRowResult() throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String sample1 = getJsonString("json/sample_01.json");
+ String sample2 = getJsonString("json/sample_02.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName
+ + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR,
COUNTER1 DOUBLE,"
+ + " COUNTER2 VARCHAR,"
+ + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY
KEY(PK1, PK2, PK3))";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3,
COUNTER1, COL3, COL4)"
+ + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON
DUPLICATE KEY " +
+ "IGNORE";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
1011.202, null, true,
+ bsonDocument1, bsonDocument1, 123);
+
+ upsertSql =
+ "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) "
+ + "VALUES('pk000', -123.98, 'pk003', 0) ON
DUPLICATE"
+ + " KEY IGNORE";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
1011.202, null, false,
+ null, bsonDocument1, 123);
+
+ upsertSql =
+ "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3, COUNTER1, COUNTER2)
VALUES('pk000', -123.98, "
+ + "'pk003', 234, 'col2_000')";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 234d,
"col2_000", true,
+ null, bsonDocument1, 123);
+
+ upsertSql = "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
+ + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
+ + " END, "
+ + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN
'col2_001' ELSE COUNTER2 "
+ + "END, "
+ + "COL3 = ?, "
+ + "COL4 = 234";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
2233.99, "col2_001", true,
+ bsonDocument2, bsonDocument2, 234);
+
+ upsertSql = "UPSERT INTO " + tableName
+ + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON
DUPLICATE KEY UPDATE "
+ + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 +
1999.99 ELSE COUNTER1"
+ + " END,"
+ + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN
'col2_001' ELSE COUNTER2 "
+ + "END";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
2233.99, "col2_001", false
+ , null, bsonDocument2, 234);
+ }
+ }
+
+ private static void validateReturnedRowAfterUpsert(Connection conn,
+ String upsertSql,
+ String tableName,
+ Double col1,
+ String col2,
+ boolean success,
+ BsonDocument inputDoc,
+ BsonDocument
expectedDoc,
+ Integer col4)
+ throws SQLException {
+ final Pair<Integer, Tuple> resultPair;
+ if (inputDoc != null) {
+ PhoenixPreparedStatement ps =
+
conn.prepareStatement(upsertSql).unwrap(PhoenixPreparedStatement.class);
+ ps.setObject(1, inputDoc);
+ resultPair = ps.executeUpdateReturnRow();
+ } else {
+ resultPair = conn.createStatement().unwrap(PhoenixStatement.class)
+ .executeUpdateReturnRow(upsertSql);
+ }
+ assertEquals(success ? 1 : 0, resultPair.getFirst().intValue());
+ ResultTuple result = (ResultTuple) resultPair.getSecond();
+ PTable table =
conn.unwrap(PhoenixConnection.class).getTable(tableName);
+
+ Cell cell =
+ result.getResult()
+
.getColumnLatestCell(table.getColumns().get(3).getFamilyName().getBytes(),
+
table.getColumns().get(3).getColumnQualifierBytes());
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 1);
+ assertEquals("pk000",
+ PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
+
+ ptr = new ImmutableBytesPtr();
+ table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 2);
+ assertEquals(-123.98,
+ PDouble.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
+
+ ptr = new ImmutableBytesPtr();
+ table.getRowKeySchema().iterator(cell.getRowArray(), ptr, 3);
+ assertEquals("pk003",
+ PVarchar.INSTANCE.toObject(ptr.get(), ptr.getOffset(),
ptr.getLength()));
+
+ assertEquals(col1,
+ PDouble.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(),
+ cell.getValueLength()));
+ cell = result.getResult()
+
.getColumnLatestCell(table.getColumns().get(4).getFamilyName().getBytes(),
+ table.getColumns().get(4).getColumnQualifierBytes());
+ if (col2 != null) {
+ assertEquals(col2,
+ PVarchar.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(),
+ cell.getValueLength()));
+ } else {
+ assertNull(cell);
+ }
+
+ cell = result.getResult()
+
.getColumnLatestCell(table.getColumns().get(5).getFamilyName().getBytes(),
+ table.getColumns().get(5).getColumnQualifierBytes());
+ if (expectedDoc != null) {
+ assertEquals(expectedDoc,
+ PBson.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(),
+ cell.getValueLength()));
+ } else {
+ assertNull(cell);
+ }
+
+ cell = result.getResult()
+
.getColumnLatestCell(table.getColumns().get(6).getFamilyName().getBytes(),
+ table.getColumns().get(6).getColumnQualifierBytes());
+ if (col4 != null) {
+ assertEquals(col4,
+ PInteger.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(),
+ cell.getValueLength()));
+ } else {
+ assertNull(cell);
+ }
+ }
+
+ @Test
+ public void testReturnRowResultWithoutAutoCommit() throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String sample1 = getJsonString("json/sample_01.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName
+ + "(PK1 VARCHAR, PK2 DOUBLE NOT NULL, PK3 VARCHAR,
COUNTER1 DOUBLE,"
+ + " COUNTER2 VARCHAR,"
+ + " COL3 BSON, COL4 INTEGER, CONSTRAINT pk PRIMARY
KEY(PK1, PK2, PK3))";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3,
COUNTER1, COL3, COL4)"
+ + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON
DUPLICATE KEY " +
+ "IGNORE";
+ validateReturnedRowAfterUpsert(conn, upsertSql, tableName,
1011.202, null, true,
+ bsonDocument1, bsonDocument1, 123);
+ throw new RuntimeException("Should not reach here");
+ } catch (SQLException e) {
+
Assert.assertEquals(SQLExceptionCode.AUTO_COMMIT_NOT_ENABLED.getErrorCode(),
+ e.getErrorCode());
+ }
+ }
+
@Test
public void testColumnsTimestampUpdateWithAllCombinations() throws
Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -643,4 +835,9 @@ public class OnDuplicateKey2IT extends
ParallelStatsDisabledIT {
}
return patchVersion >= 9;
}
+
+ private static String getJsonString(String jsonFilePath) throws
IOException {
+ URL fileUrl =
OnDuplicateKey2IT.class.getClassLoader().getResource(jsonFilePath);
+ return FileUtils.readFileToString(new File(fileUrl.getFile()),
Charset.defaultCharset());
+ }
}