This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new f1949b6664 PHOENIX-6714 Return update status from Conditional Upserts
(#1884)
f1949b6664 is described below
commit f1949b666462b208c5dad2d2ff2bb8722e19c125
Author: Jing Yu <[email protected]>
AuthorDate: Tue Jul 2 13:51:07 2024 -0700
PHOENIX-6714 Return update status from Conditional Upserts (#1884)
---
.../BaseScannerRegionObserverConstants.java | 2 +
.../org/apache/phoenix/execute/MutationState.java | 33 +-
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 11 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 188 +++++-
.../apache/phoenix/end2end/OnDuplicateKey2IT.java | 646 +++++++++++++++++++++
.../apache/phoenix/end2end/OnDuplicateKeyIT.java | 27 +-
.../index/ReplicationWithWALAnnotationIT.java | 2 +-
7 files changed, 863 insertions(+), 46 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index 23c039c630..e36b073676 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -62,6 +62,8 @@ public class BaseScannerRegionObserverConstants {
public static final String UPSERT_SELECT_EXPRS = "_UpsertSelectExprs";
public static final String DELETE_CQ = "_DeleteCQ";
public static final String DELETE_CF = "_DeleteCF";
+ public static final String UPSERT_STATUS_CQ = "_UpsertStatusCQ";
+ public static final String UPSERT_CF = "_UpsertCF";
public static final String EMPTY_CF = "_EmptyCF";
public static final String EMPTY_COLUMN_QUALIFIER =
"_EmptyColumnQualifier";
public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1f2493aab2..529b6cd60e 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.execute;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_CF;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_STATUS_CQ;
import static
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
import static
org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -107,9 +110,11 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableRef;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
@@ -164,6 +169,7 @@ public class MutationState implements SQLCloseable {
private long sizeOffset;
private int numRows = 0;
+ private int numUpdatedRowsForAutoCommit = 0;
private long estimatedSize = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false;
@@ -468,6 +474,10 @@ public class MutationState implements SQLCloseable {
return sizeOffset + numRows;
}
+ public int getNumUpdatedRowsForAutoCommit() {
+ return numUpdatedRowsForAutoCommit;
+ }
+
public int getNumRows() {
return numRows;
}
@@ -1451,6 +1461,7 @@ public class MutationState implements SQLCloseable {
Table hTable =
connection.getQueryServices().getTable(htableName);
List<Mutation> currentMutationBatch = null;
boolean areAllBatchesSuccessful = false;
+ Object[] resultObjects = null;
try {
if (table.isTransactional()) {
@@ -1475,17 +1486,21 @@ public class MutationState implements SQLCloseable {
while (itrListMutation.hasNext()) {
final List<Mutation> mutationBatch =
itrListMutation.next();
currentMutationBatch = mutationBatch;
+ if (connection.getAutoCommit() && mutationBatch.size()
== 1) {
+ resultObjects = new Object[mutationBatch.size()];
+ }
if (shouldRetryIndexedMutation) {
// if there was an index write failure, retry the
mutation in a loop
final Table finalHTable = hTable;
final ImmutableBytesWritable finalindexMetaDataPtr
=
indexMetaDataPtr;
final PTable finalPTable = table;
+ final Object[] finalResultObjects = resultObjects;
PhoenixIndexFailurePolicyHelper.doBatchWithRetries(new MutateCommand() {
@Override
public void doMutation() throws IOException {
try {
- finalHTable.batch(mutationBatch, null);
+ finalHTable.batch(mutationBatch,
finalResultObjects);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
@@ -1524,8 +1539,22 @@ public class MutationState implements SQLCloseable {
}, iwe, connection,
connection.getQueryServices().getProps());
shouldRetryIndexedMutation = false;
} else {
- hTable.batch(mutationBatch, null);
+ hTable.batch(mutationBatch, resultObjects);
+ }
+
+ if (resultObjects != null) {
+ Result result = (Result) resultObjects[0];
+ if (result != null && !result.isEmpty()) {
+ Cell cell = result.getColumnLatestCell(
+ Bytes.toBytes(UPSERT_CF),
Bytes.toBytes(UPSERT_STATUS_CQ));
+ numUpdatedRowsForAutoCommit =
PInteger.INSTANCE.getCodec()
+ .decodeInt(cell.getValueArray(),
cell.getValueOffset(),
+ SortOrder.getDefault());
+ } else {
+ numUpdatedRowsForAutoCommit = 1;
+ }
}
+
// remove each batch from the list once it gets applied
// so when failures happens for any batch we only start
// from that batch only instead of doing duplicate
reply of already
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index fc7dcfb162..92e2aa07ee 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -627,13 +627,18 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
checkIfDDLStatementandMutationState(stmt,
state);
MutationState lastState = plan.execute();
state.join(lastState);
+ // Unfortunately, JDBC uses an int for update
count, so we
+ // just max out at Integer.MAX_VALUE
+ int lastUpdateCount = (int)
Math.min(Integer.MAX_VALUE,
+ lastState.getUpdateCount());
if (connection.getAutoCommit()) {
connection.commit();
+ if (isAtomicUpsert) {
+ lastUpdateCount =
connection.getMutationState()
+
.getNumUpdatedRowsForAutoCommit();
+ }
}
setLastQueryPlan(null);
- // Unfortunately, JDBC uses an int for update
count, so we
- // just max out at Integer.MAX_VALUE
- int lastUpdateCount = (int)
Math.min(Integer.MAX_VALUE, lastState.getUpdateCount());
setLastUpdateCount(lastUpdateCount);
setLastUpdateOperation(stmt.getOperation());
setLastUpdateTable(tableName == null ?
TABLE_UNKNOWN : tableName);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a4547b60c9..317841fb13 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -18,6 +18,9 @@
package org.apache.phoenix.hbase.index;
+import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SUCCESS;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_CF;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.UPSERT_STATUS_CQ;
import static
org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
import java.io.ByteArrayInputStream;
@@ -40,7 +43,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.CaseExpression;
import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
@@ -120,8 +125,10 @@ import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerIndexUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -135,6 +142,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
import static
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
import static
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
/**
* Do all the work of managing index updates from a single coprocessor. All
Puts/Delets are passed
@@ -147,8 +155,8 @@ import static
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRI
public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
private static final Logger LOG =
LoggerFactory.getLogger(IndexRegionObserver.class);
- private static final OperationStatus IGNORE = new
OperationStatus(OperationStatusCode.SUCCESS);
- private static final OperationStatus NOWRITE = new
OperationStatus(OperationStatusCode.SUCCESS);
+ private static final OperationStatus IGNORE = new OperationStatus(SUCCESS);
+ private static final OperationStatus NOWRITE = new
OperationStatus(SUCCESS);
public static final String PHOENIX_APPEND_METADATA_TO_WAL =
"phoenix.append.metadata.to.wal";
public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
/**
@@ -513,7 +521,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.isAtomicOp(m)) {
miniBatchOp.setOperationStatus(i, IGNORE);
- continue;
}
}
}
@@ -521,9 +528,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private void populateRowsToLock(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
BatchMutateContext context) {
for (int i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
- continue;
- }
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.isAtomicOp(m) || this.builder.isEnabled(m)) {
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
@@ -573,8 +577,19 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (!mutations.isEmpty()) {
addOnDupMutationsToBatch(miniBatchOp, i, mutations);
} else {
- // empty list of generated mutations implies ON DUPLICATE
KEY IGNORE
- miniBatchOp.setOperationStatus(i, IGNORE);
+ // empty list of generated mutations implies
+ // 1) ON DUPLICATE KEY IGNORE if row already exists, OR
+ // 2) ON DUPLICATE KEY UPDATE if CASE expression is
specified and in each of
+ // them the new value is the same as the old value in the
ELSE-clause (empty
+ // cell timestamp will NOT be updated)
+ byte[] retVal = PInteger.INSTANCE.toBytes(0);
+ Cell cell = PhoenixKeyValueUtil.newKeyValue(m.getRow(),
Bytes.toBytes(UPSERT_CF),
+ Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0,
retVal.length);
+ // put Result in OperationStatus for returning update status
from conditional
+ // upserts, where 0 represents the row is not updated
+ Result result = Result.create(new
ArrayList<>(Arrays.asList(cell)));
+ miniBatchOp.setOperationStatus(i,
+ new OperationStatus(SUCCESS, result));
}
}
}
@@ -602,7 +617,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// unfortunately, we really should ask if the raw mutation (rather
than the combined mutation)
// should be indexed, which means we need to expose another method
on the builder. Such is the
// way optimization go though.
- if (miniBatchOp.getOperationStatus(i) != IGNORE &&
this.builder.isEnabled(m)) {
+ if (!isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))
&& this.builder.isEnabled(m)) {
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
MultiMutation stored = context.multiMutationMap.get(row);
if (stored == null) {
@@ -625,7 +640,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
public static void setTimestamps(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
IndexBuildManager builder, long ts)
throws IOException {
for (Integer i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ if (isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
@@ -725,7 +740,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private void
applyPendingPutMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
BatchMutateContext context, long
now) throws IOException {
for (Integer i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ if (isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
@@ -824,7 +839,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
for (int i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ if (isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
@@ -1129,9 +1144,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private void identifyMutationTypes(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
BatchMutateContext context) {
for (int i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
- continue;
- }
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.isAtomicOp(m)) {
context.hasAtomic = true;
@@ -1302,8 +1314,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private void
releaseLocksForOnDupIgnoreMutations(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
BatchMutateContext
context) {
for (int i = 0; i < miniBatchOp.size(); i++) {
- // status of all ON DUPLICATE KEY IGNORE mutations is updated to
IGNORE
- if (miniBatchOp.getOperationStatus(i) != IGNORE) {
+ if (!isAtomicOperationComplete(miniBatchOp.getOperationStatus(i)))
{
continue;
}
Mutation m = miniBatchOp.getOperation(i);
@@ -1388,6 +1399,17 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
try {
if (success) {
context.currentPhase = BatchMutatePhase.POST;
+ if(context.hasAtomic && miniBatchOp.size() == 1) {
+ if
(!isAtomicOperationComplete(miniBatchOp.getOperationStatus(0))) {
+ byte[] retVal = PInteger.INSTANCE.toBytes(1);
+ Cell cell = PhoenixKeyValueUtil.newKeyValue(
+ miniBatchOp.getOperation(0).getRow(),
Bytes.toBytes(UPSERT_CF),
+ Bytes.toBytes(UPSERT_STATUS_CQ), 0, retVal, 0,
retVal.length);
+ Result result = Result.create(new
ArrayList<>(Arrays.asList(cell)));
+ miniBatchOp.setOperationStatus(0,
+ new OperationStatus(SUCCESS, result));
+ }
+ }
} else {
context.currentPhase = BatchMutatePhase.FAILED;
}
@@ -1516,9 +1538,12 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
* to correctly support concurrent index mutations we need to always read
the latest
* data table row from memory.
* It takes in an atomic Put mutation and generates a list of Put and
Delete mutations.
- * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the
row already exists.
- * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put
mutation and optionally
- * one Delete mutation (with DeleteColumn type cells for all columns set
to null).
+ * The mutation list will be empty in two cases:
+ * 1) ON DUPLICATE KEY IGNORE and the row already exists;
+ * 2) ON DUPLICATE KEY UPDATE if CASE expression is specified and in each
of them the new
+ * value is the same as the old value in the ELSE-clause.
+ * Otherwise, we will generate one Put mutation and optionally one Delete
mutation (with
+ * DeleteColumn type cells for all columns set to null).
*/
private List<Mutation> generateOnDupMutations(BatchMutateContext context,
Put atomicPut) throws IOException {
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
@@ -1565,6 +1590,12 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// read the column values requested in the get from the current data row
List<Cell> cells = IndexUtil.readColumnsFromRow(currentDataRowState,
colsReadInExpr);
+ // store current cells into a map where the key is ColumnReference of
the column family and
+ // column qualifier, and value is a pair of cell and a boolean. The
value of the boolean
+ // will be true if the expression is CaseExpression and Else-clause is
evaluated to be
+ // true, will be null if there is no expression on this column,
otherwise false
+ Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new
HashMap<>();
+
if (currentDataRowState == null) { // row doesn't exist
if (skipFirstOp) {
if (operations.size() <= 1 && repeat <= 1) {
@@ -1583,6 +1614,16 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
} else {
// Base current state off of existing row
flattenedCells = cells;
+ // store all current cells from currentDataRowState
+ for (Map.Entry<byte[], List<Cell>> entry :
+ currentDataRowState.getFamilyCellMap().entrySet()) {
+ for (Cell cell : new ArrayList<>(entry.getValue())) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ ColumnReference colRef = new ColumnReference(family,
qualifier);
+ currColumnCellExprMap.put(colRef, new Pair<>(cell, null));
+ }
+ }
}
MultiKeyValueTuple tuple = new MultiKeyValueTuple(flattenedCells);
@@ -1604,7 +1645,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
int adjust = table.getBucketNum() == null ? 1 : 2;
for (int i = 0; i < expressions.size(); i++) {
Expression expression = expressions.get(i);
- ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ ptr.set(EMPTY_BYTE_ARRAY);
expression.evaluate(tuple, ptr);
PColumn column = table.getColumns().get(i + adjust);
Object value = expression.getDataType().toObject(ptr,
column.getSortOrder());
@@ -1620,6 +1661,22 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
column.getSortOrder(), table.rowKeyOrderOptimizable());
byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
row.setValue(column, bytes);
+
+ // If the column exist in currColumnCellExprMap, set the
boolean value in the
+ // map to be true if the expression is CaseExpression and
the Else-clause is
+ // evaluated to be true
+ ColumnReference colRef = new
ColumnReference(column.getFamilyName().getBytes(),
+ column.getColumnQualifierBytes());
+ if (currColumnCellExprMap.containsKey(colRef)) {
+ Pair<Cell, Boolean> valuePair =
currColumnCellExprMap.get(colRef);
+ if (expression instanceof CaseExpression
+ && ((CaseExpression)
expression).evaluateIndexOf(tuple, ptr)
+ == expression.getChildren().size() - 1) {
+ valuePair.setSecond(true);
+ } else {
+ valuePair.setSecond(false);
+ }
+ }
}
List<Cell> updatedCells =
Lists.newArrayListWithExpectedSize(estimatedSize);
List<Mutation> newMutations = row.toRowMutations();
@@ -1628,34 +1685,52 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
// update the cells to the latest values calculated above
flattenedCells = mergeCells(flattenedCells, updatedCells);
+ // we need to retrieve empty cell later on which relies on
binary search
+ flattenedCells.sort(CellComparator.getInstance());
tuple.setKeyValues(flattenedCells);
}
// Repeat only applies to first statement
repeat = 1;
}
+ put = new Put(rowKey);
+ delete = new Delete(rowKey);
+ transferAttributes(atomicPut, put);
+ transferAttributes(atomicPut, delete);
for (int i = 0; i < tuple.size(); i++) {
Cell cell = tuple.getValue(i);
if (cell.getType() == Cell.Type.Put) {
- if (put == null) {
- put = new Put(rowKey);
- transferAttributes(atomicPut, put);
- mutations.add(put);
+ if (checkCellNeedUpdate(cell, currColumnCellExprMap)) {
+ put.add(cell);
}
- put.add(cell);
} else {
- if (delete == null) {
- delete = new Delete(rowKey);
- transferAttributes(atomicPut, delete);
- mutations.add(delete);
- }
delete.add(cell);
}
}
+
+ if (!put.isEmpty() || !delete.isEmpty()) {
+ PTable table = operations.get(0).getFirst();
+ addEmptyKVCellToPut(put, tuple, table);
+ }
+
+ if (!put.isEmpty()) {
+ mutations.add(put);
+ }
+ if (!delete.isEmpty()) {
+ mutations.add(delete);
+ }
+
return mutations;
}
-
+ private void addEmptyKVCellToPut(Put put, MultiKeyValueTuple tuple, PTable
table) throws IOException {
+ byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] emptyCQ =
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ Cell emptyKVCell = tuple.getValue(emptyCF, emptyCQ);
+ if (emptyKVCell != null) {
+ put.add(emptyKVCell);
+ }
+ }
private static List<Cell> flattenCells(Mutation m) {
List<Cell> flattenedCells = new ArrayList<>();
@@ -1669,6 +1744,44 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
+ /**
+ * This function is to check if a cell need to be updated, based on the
current cells' values.
+ * The cell will not be updated only if the column exist in the expression
in which CASE is
+ * specified and the new value is the same as the old value in the
ELSE-clause, otherwise it
+ * should be updated.
+ *
+ * @param cell the cell with new value to be checked
+ * @param colCellExprMap the column reference map with cell current value
+ * @return true if the cell need update, false otherwise
+ */
+ private boolean checkCellNeedUpdate(Cell cell,
+ Map<ColumnReference, Pair<Cell,
Boolean>> colCellExprMap) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ ColumnReference colRef = new ColumnReference(family, qualifier);
+
+ // if cell not exist in the map, meaning that they are new and need
update
+ if (colCellExprMap.isEmpty() || !colCellExprMap.containsKey(colRef)) {
+ return true;
+ }
+
+ Pair<Cell, Boolean> valuePair = colCellExprMap.get(colRef);
+ Boolean isInCaseExpressionElseClause = valuePair.getSecond();
+ if (isInCaseExpressionElseClause == null) {
+ return false;
+ }
+ if (!isInCaseExpressionElseClause) {
+ return true;
+ }
+ Cell oldCell = valuePair.getFirst();
+ ImmutableBytesPtr newValuePtr = new
ImmutableBytesPtr(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength());
+ ImmutableBytesPtr oldValuePtr = new
ImmutableBytesPtr(oldCell.getValueArray(),
+ oldCell.getValueOffset(), oldCell.getValueLength());
+ return !Bytes.equals(oldValuePtr.get(), oldValuePtr.getOffset(),
oldValuePtr.getLength(),
+ newValuePtr.get(), newValuePtr.getOffset(),
newValuePtr.getLength());
+ }
+
/**
* ensure that the generated mutations have all the attributes like schema
*/
@@ -1716,4 +1829,15 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
public static Map<String, byte[]> getAttributeValuesFromWALKey(WALKey key)
{
return new HashMap<String, byte[]>(key.getExtendedAttributes());
}
+
+ /**
+ * Determines whether the atomic operation is complete based on the
operation status.
+ * HBase returns null Result by default for successful Put and Delete
mutations, only for
+ * Increment and Append mutations, non-null Result is returned by default.
+ * @param status the operation status.
+ * @return true if the atomic operation is completed, false otherwise.
+ */
+ public static boolean isAtomicOperationComplete(OperationStatus status) {
+ return status.getOperationStatusCode() == SUCCESS &&
status.getResult() != null;
+ }
}
\ No newline at end of file
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
new file mode 100644
index 0000000000..f7d72b868e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class OnDuplicateKey2IT extends ParallelStatsDisabledIT {
+ private final String indexDDL;
+ private final String tableDDLOptions;
+
+ private static final String[] INDEX_DDLS =
+ new String[] {
+ "",
+ "create local index %s_IDX on %s(counter1) include
(counter2)",
+ "create local index %s_IDX on %s(counter1, counter2)",
+ "create index %s_IDX on %s(counter1) include (counter2)",
+ "create index %s_IDX on %s(counter1, counter2)",
+ "create uncovered index %s_IDX on %s(counter1)",
+ "create uncovered index %s_IDX on %s(counter1, counter2)"};
+
+ public OnDuplicateKey2IT(String indexDDL, boolean columnEncoded) {
+ this.indexDDL = indexDDL;
+ this.tableDDLOptions = columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0";
+ }
+
+ @Parameters(name="OnDuplicateKey2IT_{index},columnEncoded={1}")
+ public static synchronized Collection<Object> data() {
+ List<Object> testCases = Lists.newArrayList();
+ for (String indexDDL : INDEX_DDLS) {
+ for (boolean columnEncoded : new boolean[]{ false, true }) {
+ testCases.add(new Object[] { indexDDL, columnEncoded });
+ }
+ }
+ return testCases;
+ }
+
+ private void createIndex(Connection conn, String tableName) throws
SQLException {
+ if (indexDDL == null || indexDDL.length() == 0) {
+ return;
+ }
+ String ddl = String.format(indexDDL, tableName, tableName);
+ conn.createStatement().execute(ddl);
+ }
+
+ @Test
+ public void testIgnoreReturnValue() throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key,
counter1 bigint, counter2 bigint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ conn.createStatement().execute("UPSERT INTO " + tableName + "
VALUES('a',10)");
+
+ int actualReturnValue = conn.createStatement().executeUpdate(
+ "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY
IGNORE");
+ assertEquals(0, actualReturnValue);
+
+ conn.close();
+ }
+
+ @Test
+ public void testColumnsTimestampUpdateWithAllCombinations() throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName + "(pk varchar primary
key, " +
+ "counter1 integer, counter2 integer, counter3 smallint,
counter4 bigint, " +
+ "counter5 varchar)" + tableDDLOptions;
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = String.format("UPSERT INTO %s VALUES('abc', 0, 10,
100, 1000, 'NONE')",
+ tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+
+ List<Long> oldTimestamps = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ dml = "UPSERT INTO " + tableName + " VALUES ('abc', 0, 10) ON
DUPLICATE KEY UPDATE " +
+ // conditional update with different value
+ "counter1 = CASE WHEN counter1 < 1 THEN counter1 + 1 ELSE
counter1 END, " +
+ // conditional update with same value in ELSE clause (will
not update timestamp)
+ "counter2 = CASE WHEN counter2 < 10 THEN counter2 + 1 ELSE
counter2 END, " +
+ // intentional update with different value
+ "counter3 = counter3 + 100, " +
+ // intentional update with same value
+ "counter4 = counter4";
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(10, rs.getInt("counter2"));
+ assertEquals(200, rs.getInt("counter3"));
+ assertEquals(1000, rs.getInt("counter4"));
+ assertEquals("NONE", rs.getString("counter5"));
+ assertFalse(rs.next());
+
+ List<Long> newTimestamps = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ assertEquals(6, oldTimestamps.size());
+ assertEquals(6, newTimestamps.size());
+ assertEquals(oldTimestamps.get(2), newTimestamps.get(2)); //
counter2 NOT updated
+ assertEquals(oldTimestamps.get(5), newTimestamps.get(5)); //
counter5 NOT updated
+ assertTrue(oldTimestamps.get(0) < newTimestamps.get(0)
+ && oldTimestamps.get(1) < newTimestamps.get(1)
+ && oldTimestamps.get(3) < newTimestamps.get(3)
+ && oldTimestamps.get(4) < newTimestamps.get(4)); // other
columns updated
+ }
+ }
+
+ @Test
+ public void testColumnsTimestampUpdateWithOneConditionalUpdate() throws
Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 bigint, counter2
bigint)" + tableDDLOptions;
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml;
+ dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100)",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: timestamps update with different value in
WHEN-THEN-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter1 < 1 THEN counter1 +
1 ELSE counter1 END",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList1.get(0) > timestampList0.get(0)
+ && timestampList1.get(1) > timestampList0.get(1));
+
+ // Case 2: timestamps NOT update with same value in ELSE-clause
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(0, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList1.get(0), timestampList2.get(0)); //
empty column NOT updated
+ assertEquals(timestampList1.get(1), timestampList2.get(1)); //
counter1 NOT updated
+
+ // Case 3: timestamps update with different value in ELSE-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter1 < 1 THEN counter1
ELSE counter1 + 1 END",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(2, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList3.get(0) > timestampList2.get(0)
+ && timestampList3.get(1) > timestampList2.get(1));
+
+ // Case 4: timestamps update with same value in WHEN-THEN-clause
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList4 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList4.get(0) > timestampList3.get(0)
+ && timestampList4.get(1) > timestampList3.get(1));
+ }
+ }
+
+ @Test
+ public void testColumnsTimestampUpdateWithOneConditionalValuesUpdate()
throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 integer, counter2
integer)" + tableDDLOptions;
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml = String.format("UPSERT INTO %s VALUES('abc', 1, 100)",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: timestamps update with same value in WHEN-THEN-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter2 <= 100 THEN 1 ELSE 0 END",
tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ assertTrue(timestampList0.get(0) < timestampList1.get(0)
+ && timestampList0.get(1) < timestampList1.get(1)); //
counter1 updated
+ assertEquals(timestampList0.get(2), timestampList1.get(2)); //
counter2 NOT updated
+
+ // Case 2: timestamps NOT update with same value in ELSE-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter2 > 100 THEN 0 ELSE 1 END",
tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(0, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ assertEquals(timestampList1.get(0), timestampList2.get(0));
+ assertEquals(timestampList1.get(1), timestampList2.get(1));
+ assertEquals(timestampList1.get(2), timestampList2.get(2));
+ }
+ }
+
+ @Test
+ public void testColumnsTimestampUpdateWithMultipleConditionalUpdate()
throws Exception {
+ Assume.assumeTrue("Set correct result to RegionActionResult on hbase
versions " +
+ "2.4.18+, 2.5.9+, and 2.6.0+",
isSetCorrectResultEnabledOnHBase());
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 integer, counter2
integer, approval " +
+ "varchar)" + tableDDLOptions;
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml;
+ dml = String.format("UPSERT INTO %s VALUES('abc', 0, 9, 'NONE')",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: all columns timestamps updated
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter1 < 1 THEN 1 ELSE counter1
END," +
+ "counter2 = CASE WHEN counter2 < 11 THEN counter2 + 1 ELSE
counter2 END," +
+ "approval = CASE WHEN counter2 < 10 THEN 'NONE' " +
+ "WHEN counter2 < 11 THEN 'MANAGER_APPROVAL' " +
+ "ELSE approval END", tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(10, rs.getInt("counter2"));
+ assertEquals("NONE", rs.getString("approval"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList1.get(0) > timestampList0.get(0)
+ && timestampList1.get(1) > timestampList0.get(1)
+ && timestampList1.get(2) > timestampList0.get(2)
+ && timestampList1.get(3) > timestampList0.get(3));
+
+ // Case 2: timestamps of counter2 and approval updated
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(11, rs.getInt("counter2"));
+ assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList1.get(1), timestampList2.get(1)); //
counter1 NOT updated
+ assertTrue(timestampList2.get(0) > timestampList1.get(0)
+ && timestampList2.get(2) > timestampList1.get(2)
+ && timestampList2.get(3) > timestampList1.get(3));
+
+ // Case 3: all timestamps NOT updated, including empty column
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(0, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(11, rs.getInt("counter2"));
+ assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList2.get(0), timestampList3.get(0));
+ assertEquals(timestampList2.get(1), timestampList3.get(1));
+ assertEquals(timestampList2.get(2), timestampList3.get(2));
+ assertEquals(timestampList2.get(3), timestampList3.get(3));
+ }
+ }
+
+ @Test
+ public void testColumnsTimestampUpdateWithIntentionalUpdate() throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 bigint, counter2
bigint)" + tableDDLOptions;
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml;
+ dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100)",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: different value of one column
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1 + 1",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList0.get(2), timestampList1.get(2)); //
counter2 NOT updated
+ assertTrue(timestampList1.get(0) > timestampList0.get(0)
+ && timestampList1.get(1) > timestampList0.get(1)); //
updated columns
+
+ // Case 2: same value of one column will also be updated
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList2.get(2), timestampList1.get(2)); //
counter2 NOT updated
+ assertTrue(timestampList2.get(0) > timestampList1.get(0)
+ && timestampList2.get(1) > timestampList1.get(1));
+
+ // Case 3: same value of one column, different of the other
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1,
counter2 = counter2 + 1",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(101, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList3.get(0) > timestampList2.get(0)
+ && timestampList3.get(1) > timestampList2.get(1)
+ && timestampList3.get(2) > timestampList2.get(2)); //
counter2
+
+ // Case 4: same values of all columns will also be updated
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1,
counter2 = counter2",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(101, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList4 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList4.get(0) > timestampList3.get(0)
+ && timestampList4.get(1) > timestampList3.get(1)
+ && timestampList4.get(2) > timestampList3.get(2));
+ }
+ }
+
+ @Test
+ public void testBatchedUpsertOnDupKeyAutoCommit() throws Exception {
+ testBatchedUpsertOnDupKey(true);
+ }
+
+ @Test
+ public void testBatchedUpsertOnDupKeyNoAutoCommit() throws Exception {
+ testBatchedUpsertOnDupKey(false);
+ }
+
+ private void testBatchedUpsertOnDupKey(boolean autocommit) throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(autocommit);
+
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("create table " + tableName + "(pk varchar primary
key, " +
+ "counter1 integer, counter2 integer, approval varchar)");
+ createIndex(conn, tableName);
+
+ stmt.execute("UPSERT INTO " + tableName + " VALUES('a', 0, 10,
'NONE')");
+ conn.commit();
+
+ stmt.addBatch("UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('a', 0, 10) ON
DUPLICATE KEY IGNORE");
+ stmt.addBatch("UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('a', 0, 10) ON
DUPLICATE KEY UPDATE" +
+ " counter1 = CASE WHEN counter1 < 1 THEN 1 ELSE counter1
END");
+
+ stmt.addBatch("UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('b', 0, 9) ON DUPLICATE
KEY IGNORE");
+ String dml = "UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('b', 0, 10) ON
DUPLICATE KEY UPDATE" +
+ " counter2 = CASE WHEN counter2 < 11 THEN counter2 + 1
ELSE counter2 END," +
+ " approval = CASE WHEN counter2 < 10 THEN 'NONE'" +
+ " WHEN counter2 < 11 THEN 'MANAGER_APPROVAL'" +
+ " ELSE approval END";
+ stmt.addBatch(dml);
+ stmt.addBatch(dml);
+ stmt.addBatch(dml);
+
+ int[] actualReturnValues = stmt.executeBatch();
+ int[] expectedReturnValues = new int[]{1, 1, 1, 1, 1, 1};
+ if (!autocommit) {
+ conn.commit();
+ }
+ assertArrayEquals(expectedReturnValues, actualReturnValues);
+
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(10, rs.getInt("counter2"));
+ assertEquals("NONE", rs.getString("approval"));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString("pk"));
+ assertEquals(0, rs.getInt("counter1"));
+ assertEquals(11, rs.getInt("counter2"));
+ assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+ assertFalse(rs.next());
+ }
+ }
+
+ private long getEmptyKVLatestCellTimestamp(String tableName) throws
Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PTable pTable = PhoenixRuntime.getTable(conn, tableName);
+ byte[] emptyCQ =
EncodedColumnsUtil.getEmptyKeyValueInfo(pTable).getFirst();
+ return getColumnLatestCellTimestamp(tableName, emptyCQ);
+ }
+
+ private long getColumnLatestCellTimestamp(String tableName, byte[] cq)
throws Exception {
+ Scan scan = new Scan();
+ try (org.apache.hadoop.hbase.client.Connection hconn =
+ ConnectionFactory.createConnection(config)) {
+ Table table = hconn.getTable(TableName.valueOf(tableName));
+ ResultScanner resultScanner = table.getScanner(scan);
+ Result result = resultScanner.next();
+ return result.getColumnLatestCell(
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
cq).getTimestamp();
+ }
+ }
+
+ private List<Long> getAllColumnsLatestCellTimestamp(Connection conn,
String tableName) throws Exception {
+ List<Long> timestampList = new ArrayList<>();
+ PTable pTable = conn.unwrap(PhoenixConnection.class).getTable(new
PTableKey(null, tableName));
+ List<PColumn> columns = pTable.getColumns();
+
+ // timestamp of the empty cell
+ timestampList.add(getEmptyKVLatestCellTimestamp(tableName));
+ // timestamps of all other columns
+ for (int i = 1; i < columns.size(); i++) {
+ byte[] cq = columns.get(i).getColumnQualifierBytes();
+ timestampList.add(getColumnLatestCellTimestamp(tableName, cq));
+ }
+ return timestampList;
+ }
+
+ private boolean isSetCorrectResultEnabledOnHBase() {
+ // true for HBase 2.4.18+, 2.5.9+, and 2.6.0+ versions, false otherwise
+ String hbaseVersion = VersionInfo.getVersion();
+ String[] versionArr = hbaseVersion.split("\\.");
+ int majorVersion = Integer.parseInt(versionArr[0]);
+ int minorVersion = Integer.parseInt(versionArr[1]);
+ int patchVersion = Integer.parseInt(versionArr[2].split("-")[0]);
+ if (majorVersion > 2) {
+ return true;
+ }
+ if (majorVersion < 2) {
+ return false;
+ }
+ if (minorVersion >= 6) {
+ return true;
+ }
+ if (minorVersion < 4) {
+ return false;
+ }
+ if (minorVersion == 4) {
+ return patchVersion >= 18;
+ }
+ return patchVersion >= 9;
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index badc2c5e07..59c664edc6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -44,8 +44,10 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.junit.Test;
@@ -60,11 +62,11 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@RunWith(Parameterized.class)
public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
private final String indexDDL;
-
+
public OnDuplicateKeyIT(String indexDDL) {
this.indexDDL = indexDDL;
}
-
+
@Parameters
public static synchronized Collection<Object> data() {
List<Object> testCases = Lists.newArrayList();
@@ -861,17 +863,26 @@ public class OnDuplicateKeyIT extends
ParallelStatsDisabledIT {
}
private void assertHBaseRowTimestamp(String tableName, long
expectedTimestamp) throws Exception {
+ long actualTimestamp = getEmptyKVLatestCellTimestamp(tableName);
+ assertEquals(expectedTimestamp, actualTimestamp);
+ }
+
+ private long getEmptyKVLatestCellTimestamp(String tableName) throws
Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PTable pTable = PhoenixRuntime.getTable(conn, tableName);
+ byte[] emptyCQ =
EncodedColumnsUtil.getEmptyKeyValueInfo(pTable).getFirst();
+ return getColumnLatestCellTimestamp(tableName, emptyCQ);
+ }
+
+ private long getColumnLatestCellTimestamp(String tableName, byte[] cq)
throws Exception {
Scan scan = new Scan();
- byte[] emptyKVQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst();
try (org.apache.hadoop.hbase.client.Connection hconn =
- ConnectionFactory.createConnection(config)) {
+ ConnectionFactory.createConnection(config)) {
Table table = hconn.getTable(TableName.valueOf(tableName));
ResultScanner resultScanner = table.getScanner(scan);
Result result = resultScanner.next();
- long actualTimestamp = result.getColumnLatestCell(
- QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
emptyKVQualifier).getTimestamp();
- assertEquals(expectedTimestamp, actualTimestamp);
+ return result.getColumnLatestCell(
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
cq).getTimestamp();
}
}
}
-
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
index de24e4c385..45e8d7dba7 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReplicationWithWALAnnotationIT.java
@@ -181,7 +181,7 @@ public class ReplicationWithWALAnnotationIT extends
BaseTest {
String[] versionArr = hbaseVersion.split("\\.");
int majorVersion = Integer.parseInt(versionArr[0]);
int minorVersion = Integer.parseInt(versionArr[1]);
- int patchVersion = Integer.parseInt(versionArr[2].split("-hadoop")[0]);
+ int patchVersion = Integer.parseInt(versionArr[2].split("-")[0]);
if (majorVersion > 2) {
return true;
}