This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new d962363 PHOENIX-6420 Wrong result when conditional and regular
upserts are passed in the same commit batch (#1183)
d962363 is described below
commit d9623634165916a774410188b643c25871fb1a0e
Author: tkhurana <[email protected]>
AuthorDate: Tue Apr 6 14:13:09 2021 -0700
PHOENIX-6420 Wrong result when conditional and regular upserts are passed
in the same commit batch (#1183)
* PHOENIX-6420 Wrong result when conditional and regular upserts are passed
in the same commit batch
* Addressed feedback
* Added comments, rename variable
---
.../apache/phoenix/end2end/OnDuplicateKeyIT.java | 43 +++
.../apache/phoenix/execute/PartialCommitIT.java | 2 +-
.../org/apache/phoenix/execute/MutationState.java | 331 +++++++++++++++------
.../java/org/apache/phoenix/util/KeyValueUtil.java | 26 +-
.../apache/phoenix/execute/MutationStateTest.java | 105 +++++--
5 files changed, 389 insertions(+), 118 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index f8630a1..fde31e4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -614,6 +614,49 @@ public class OnDuplicateKeyIT extends
ParallelStatsDisabledIT {
conn.close();
}
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + "(pk varchar primary
key, counter1 bigint, counter2 varchar)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ // row doesn't exist
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a',0,'abc')", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,'zzz') ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 2, "abc");
+
+ // row exists
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a', 7, 'fff')", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1, 'bazz') ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2," +
+ "counter2 = counter2 || 'ddd'", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 9, "fffddd");
+
+ // partial update
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s (pk, counter2) VALUES('a', 'gggg') ON
DUPLICATE KEY UPDATE counter1 = counter1 + 2", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s (pk, counter2) VALUES ('a', 'bar')",
tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 11, "bar");
+ }
+ }
+
+ private void assertRow(Connection conn, String tableName, String
expectedPK, int expectedCol1, String expectedCol2) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " +
tableName);
+ assertTrue(rs.next());
+ assertEquals(expectedPK,rs.getString(1));
+ assertEquals(expectedCol1,rs.getInt(2));
+ assertEquals(expectedCol2,rs.getString(3));
+ assertFalse(rs.next());
+ }
+
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 6e13564..5fabff5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -265,7 +265,7 @@ public class PartialCommitIT extends
BaseUniqueNamesOwnClusterIT {
private PhoenixConnection
getConnectionWithTableOrderPreservingMutationState() throws SQLException {
try (PhoenixConnection con =
DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) {
- final Map<TableRef, MultiRowMutationState> mutations =
Maps.newTreeMap(new TableRefComparator());
+ final Map<TableRef, List<MultiRowMutationState>> mutations =
Maps.newTreeMap(new TableRefComparator());
// passing a null mutation state forces the
connection.newMutationState() to be used to create the MutationState
return new PhoenixConnection(con, (MutationState)null) {
@Override
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 7641db4..b3b2fee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -137,7 +137,11 @@ public class MutationState implements SQLCloseable {
private final long batchSize;
private final long batchSizeBytes;
private long batchCount = 0L;
- private final Map<TableRef, MultiRowMutationState> mutations;
+ // For each table, maintain a list of mutation batches. Each element in the
+ // list is a set of row mutations which can be sent in a single commit
batch.
+ // A regular upsert and a conditional upsert on the same row conflict with
+ // each other so they are split and send separately in different commit
batches.
+ private final Map<TableRef, List<MultiRowMutationState>> mutationsMap;
private final Set<String> uncommittedPhysicalNames =
Sets.newHashSetWithExpectedSize(10);
private long sizeOffset;
@@ -145,7 +149,7 @@ public class MutationState implements SQLCloseable {
private long estimatedSize = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false;
- private Map<TableRef, MultiRowMutationState> txMutations =
Collections.emptyMap();
+ private Map<TableRef, List<MultiRowMutationState>> txMutations =
Collections.emptyMap();
private PhoenixTransactionContext phoenixTransactionContext =
PhoenixTransactionContext.NULL_CONTEXT;
@@ -182,19 +186,19 @@ public class MutationState implements SQLCloseable {
private MutationState(int maxSize, long maxSizeBytes, PhoenixConnection
connection,
boolean subTask, PhoenixTransactionContext txContext, long
sizeOffset) {
- this(maxSize, maxSizeBytes, connection, Maps.<TableRef,
MultiRowMutationState> newHashMapWithExpectedSize(5),
+ this(maxSize, maxSizeBytes, connection, Maps.<TableRef,
List<MultiRowMutationState>> newHashMapWithExpectedSize(5),
subTask, txContext);
this.sizeOffset = sizeOffset;
}
MutationState(int maxSize, long maxSizeBytes, PhoenixConnection connection,
- Map<TableRef, MultiRowMutationState> mutations, boolean subTask,
PhoenixTransactionContext txContext) {
+ Map<TableRef, List<MultiRowMutationState>> mutationsMap, boolean
subTask, PhoenixTransactionContext txContext) {
this.maxSize = maxSize;
this.maxSizeBytes = maxSizeBytes;
this.connection = connection;
this.batchSize = connection.getMutateBatchSize();
this.batchSizeBytes = connection.getMutateBatchSizeBytes();
- this.mutations = mutations;
+ this.mutationsMap = mutationsMap;
boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
@@ -212,13 +216,39 @@ public class MutationState implements SQLCloseable {
int maxSize, long maxSizeBytes, PhoenixConnection connection)
throws SQLException {
this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
if (!mutations.isEmpty()) {
- this.mutations.put(table, mutations);
+ addMutations(this.mutationsMap, table, mutations);
}
this.numRows = mutations.size();
- this.estimatedSize =
KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
+ this.estimatedSize =
KeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap);
throwIfTooBig();
}
+ // add a new batch of row mutations
+ private void addMutations(Map<TableRef, List<MultiRowMutationState>>
mutationMap, TableRef table,
+ MultiRowMutationState mutations) {
+ List<MultiRowMutationState> batches = mutationMap.get(table);
+ if (batches == null) {
+ batches = Lists.newArrayListWithExpectedSize(1);
+ }
+ batches.add(mutations);
+ mutationMap.put(table, batches);
+ }
+
+ // remove a batch of mutations which have been committed
+ private void removeMutations(Map<TableRef, List<MultiRowMutationState>>
mutationMap, TableRef table){
+ List<MultiRowMutationState> batches = mutationMap.get(table);
+ if (batches == null || batches.isEmpty()) {
+ mutationMap.remove(table);
+ return;
+ }
+
+ // mutation batches are committed in FIFO order so always remove from
the head
+ batches.remove(0);
+ if (batches.isEmpty()) {
+ mutationMap.remove(table);
+ }
+ }
+
public long getEstimatedSize() {
return estimatedSize;
}
@@ -378,7 +408,7 @@ public class MutationState implements SQLCloseable {
public static MutationState emptyMutationState(int maxSize, long
maxSizeBytes,
PhoenixConnection connection) {
MutationState state = new MutationState(maxSize, maxSizeBytes,
connection,
- Collections.<TableRef, MultiRowMutationState> emptyMap(),
false, null);
+ Collections.<TableRef, List<MultiRowMutationState>>
emptyMap(), false, null);
state.sizeOffset = 0;
return state;
}
@@ -404,67 +434,89 @@ public class MutationState implements SQLCloseable {
return numRows;
}
+ private MultiRowMutationState getLastMutationBatch(Map<TableRef,
List<MultiRowMutationState>> mutations, TableRef tableRef) {
+ List<MultiRowMutationState> mutationBatches = mutations.get(tableRef);
+ if (mutationBatches == null || mutationBatches.isEmpty()) {
+ return null;
+ }
+ return mutationBatches.get(mutationBatches.size() - 1);
+ }
+
private void joinMutationState(TableRef tableRef, MultiRowMutationState
srcRows,
- Map<TableRef, MultiRowMutationState> dstMutations) {
+ Map<TableRef, List<MultiRowMutationState>> dstMutations) {
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
- boolean incrementRowCount = dstMutations == this.mutations;
- MultiRowMutationState existingRows = dstMutations.put(tableRef,
srcRows);
- if (existingRows != null) { // Rows for that table already exist
- // Loop through new rows and replace existing with new
- for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry :
srcRows.entrySet()) {
- // Replace existing row with new row
- RowMutationState existingRowMutationState =
existingRows.put(rowEntry.getKey(), rowEntry.getValue());
- if (existingRowMutationState != null) {
- Map<PColumn, byte[]> existingValues =
existingRowMutationState.getColumnValues();
- if (existingValues != PRow.DELETE_MARKER) {
- Map<PColumn, byte[]> newRow =
rowEntry.getValue().getColumnValues();
- // if new row is PRow.DELETE_MARKER, it means delete,
and we don't need to merge it with
- // existing row.
- if (newRow != PRow.DELETE_MARKER) {
- // decrement estimated size by the size of the old
row
- estimatedSize -=
existingRowMutationState.calculateEstimatedSize();
- // Merge existing column values with new column
values
- existingRowMutationState.join(rowEntry.getValue());
- // increment estimated size by the size of the new
row
- estimatedSize +=
existingRowMutationState.calculateEstimatedSize();
- // Now that the existing row has been merged with
the new row, replace it back
- // again (since it was merged with the new one
above).
- existingRows.put(rowEntry.getKey(),
existingRowMutationState);
- }
- }
- } else {
- if (incrementRowCount && !isIndex) { // Don't count index
rows in row count
- numRows++;
- // increment estimated size by the size of the new row
- estimatedSize +=
rowEntry.getValue().calculateEstimatedSize();
- }
- }
- }
- // Put the existing one back now that it's merged
- dstMutations.put(tableRef, existingRows);
- } else {
+ boolean incrementRowCount = dstMutations == this.mutationsMap;
+ // we only need to check if the new mutation batch (srcRows) conflicts
with the
+ // last mutation batch since we try to merge it with that only
+ MultiRowMutationState existingRows =
getLastMutationBatch(dstMutations, tableRef);
+
+ if (existingRows == null) { // no rows found for this table
// Size new map at batch size as that's what it'll likely grow to.
MultiRowMutationState newRows = new
MultiRowMutationState(connection.getMutateBatchSize());
newRows.putAll(srcRows);
- dstMutations.put(tableRef, newRows);
+ addMutations(dstMutations, tableRef, newRows);
if (incrementRowCount && !isIndex) {
numRows += srcRows.size();
// if we added all the rows from newMutationState we can just
increment the
// estimatedSize by newMutationState.estimatedSize
estimatedSize += srcRows.estimatedSize;
}
+ return;
+ }
+
+ // for conflicting rows
+ MultiRowMutationState conflictingRows = new
MultiRowMutationState(connection.getMutateBatchSize());
+
+ // Rows for this table already exist, check for conflicts
+ for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry :
srcRows.entrySet()) {
+ ImmutableBytesPtr key = rowEntry.getKey();
+ RowMutationState newRowMutationState = rowEntry.getValue();
+ RowMutationState existingRowMutationState = existingRows.get(key);
+ if (existingRowMutationState == null) {
+ existingRows.put(key, newRowMutationState);
+ if (incrementRowCount && !isIndex) { // Don't count index rows
in row count
+ numRows++;
+ // increment estimated size by the size of the new row
+ estimatedSize +=
newRowMutationState.calculateEstimatedSize();
+ }
+ continue;
+ }
+ Map<PColumn, byte[]> existingValues =
existingRowMutationState.getColumnValues();
+ Map<PColumn, byte[]> newValues =
newRowMutationState.getColumnValues();
+ if (existingValues != PRow.DELETE_MARKER && newValues !=
PRow.DELETE_MARKER) {
+ // Check if we can merge existing column values with new
column values
+ long beforeMergeSize =
existingRowMutationState.calculateEstimatedSize();
+ boolean isMerged =
existingRowMutationState.join(rowEntry.getValue());
+ if (isMerged) {
+ // decrement estimated size by the size of the old row
+ estimatedSize -= beforeMergeSize;
+ // increment estimated size by the size of the new row
+ estimatedSize +=
existingRowMutationState.calculateEstimatedSize();
+ } else {
+ // cannot merge regular upsert and conditional upsert
+ // conflicting row is not a new row so no need to
increment numRows
+ conflictingRows.put(key, newRowMutationState);
+ }
+ } else {
+ existingRows.put(key, newRowMutationState);
+ }
+ }
+
+ if (!conflictingRows.isEmpty()) {
+ addMutations(dstMutations, tableRef, conflictingRows);
}
}
- private void joinMutationState(Map<TableRef, MultiRowMutationState>
srcMutations,
- Map<TableRef, MultiRowMutationState> dstMutations) {
+ private void joinMutationState(Map<TableRef, List<MultiRowMutationState>>
srcMutations,
+ Map<TableRef, List<MultiRowMutationState>> dstMutations) {
// Merge newMutation with this one, keeping state from newMutation for
any overlaps
- for (Map.Entry<TableRef, MultiRowMutationState> entry :
srcMutations.entrySet()) {
- // Replace existing entries for the table with new entries
+ for (Map.Entry<TableRef, List<MultiRowMutationState>> entry :
srcMutations.entrySet()) {
TableRef tableRef = entry.getKey();
- MultiRowMutationState srcRows = entry.getValue();
- joinMutationState(tableRef, srcRows, dstMutations);
+ for (MultiRowMutationState srcRows : entry.getValue()) {
+ // Replace existing entries for the table with new entries
+ joinMutationState(tableRef, srcRows, dstMutations);
+ }
}
}
@@ -483,10 +535,10 @@ public class MutationState implements SQLCloseable {
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
this.sizeOffset += newMutationState.sizeOffset;
- joinMutationState(newMutationState.mutations, this.mutations);
+ joinMutationState(newMutationState.mutationsMap, this.mutationsMap);
if (!newMutationState.txMutations.isEmpty()) {
if (txMutations.isEmpty()) {
- txMutations =
Maps.newHashMapWithExpectedSize(mutations.size());
+ txMutations =
Maps.newHashMapWithExpectedSize(this.mutationsMap.size());
}
joinMutationState(newMutationState.txMutations, this.txMutations);
}
@@ -586,10 +638,11 @@ public class MutationState implements SQLCloseable {
// the tables in the mutations map
if (!sendAll) {
TableRef key = new TableRef(index);
- MultiRowMutationState multiRowMutationState =
mutations.remove(key);
+ List<MultiRowMutationState> multiRowMutationState =
mutationsMap.remove(key);
if (multiRowMutationState != null) {
final List<Mutation> deleteMutations =
Lists.newArrayList();
- generateMutations(key, mutationTimestamp,
serverTimestamp, multiRowMutationState, deleteMutations, null);
+ // for index table there will only be 1 mutation
batch in the list
+ generateMutations(key, mutationTimestamp,
serverTimestamp, multiRowMutationState.get(0), deleteMutations, null);
if (indexMutations == null) {
indexMutations = deleteMutations;
} else {
@@ -734,18 +787,19 @@ public class MutationState implements SQLCloseable {
public Iterator<Pair<byte[], List<Mutation>>> toMutations(final boolean
includeMutableIndexes,
final Long tableTimestamp) {
- final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator =
this.mutations.entrySet().iterator();
+ final Iterator<Map.Entry<TableRef, List<MultiRowMutationState>>>
iterator = this.mutationsMap.entrySet().iterator();
if (!iterator.hasNext()) { return Collections.emptyIterator(); }
Long scn = connection.getSCN();
final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
final long mutationTimestamp = getMutationTimestamp(scn);
return new Iterator<Pair<byte[], List<Mutation>>>() {
- private Map.Entry<TableRef, MultiRowMutationState> current =
iterator.next();
+ private Map.Entry<TableRef, List<MultiRowMutationState>> current =
iterator.next();
+ private int batchOffset = 0;
private Iterator<Pair<byte[], List<Mutation>>> innerIterator =
init();
private Iterator<Pair<byte[], List<Mutation>>> init() {
final Iterator<Pair<PTable, List<Mutation>>> mutationIterator =
- addRowMutations(current.getKey(), current.getValue(),
mutationTimestamp,
+ addRowMutations(current.getKey(),
current.getValue().get(batchOffset), mutationTimestamp,
serverTimestamp, includeMutableIndexes, true);
return new Iterator<Pair<byte[], List<Mutation>>>() {
@Override
@@ -769,13 +823,19 @@ public class MutationState implements SQLCloseable {
@Override
public boolean hasNext() {
- return innerIterator.hasNext() || iterator.hasNext();
+ return innerIterator.hasNext() ||
+ batchOffset + 1 < current.getValue().size() ||
+ iterator.hasNext();
}
@Override
public Pair<byte[], List<Mutation>> next() {
if (!innerIterator.hasNext()) {
- current = iterator.next();
+ ++batchOffset;
+ if (batchOffset == current.getValue().size()) {
+ current = iterator.next();
+ batchOffset = 0;
+ }
innerIterator = init();
}
return innerIterator.next();
@@ -806,10 +866,10 @@ public class MutationState implements SQLCloseable {
* @throws SQLException
* if the table or any columns no longer exist
*/
- private long[] validateAll() throws SQLException {
+ private long[] validateAll(Map<TableRef, MultiRowMutationState>
commitBatch) throws SQLException {
int i = 0;
- long[] timeStamps = new long[this.mutations.size()];
- for (Map.Entry<TableRef, MultiRowMutationState> entry :
mutations.entrySet()) {
+ long[] timeStamps = new long[commitBatch.size()];
+ for (Map.Entry<TableRef, MultiRowMutationState> entry :
commitBatch.entrySet()) {
TableRef tableRef = entry.getKey();
timeStamps[i++] = validateAndGetServerTimestamp(tableRef,
entry.getValue());
}
@@ -960,45 +1020,108 @@ public class MutationState implements SQLCloseable {
}
+ /**
+ * Split the mutation batches for each table into separate commit batches.
+ * Each commit batch contains only one mutation batch
(MultiRowMutationState) for a table.
+ * @param tableRefIterator
+ * @return List of commit batches
+ */
+ private List<Map<TableRef, MultiRowMutationState>>
createCommitBatches(Iterator<TableRef> tableRefIterator) {
+ List<Map<TableRef, MultiRowMutationState>> commitBatches =
Lists.newArrayList();
+ while (tableRefIterator.hasNext()) {
+ final TableRef tableRef = tableRefIterator.next();
+ List<MultiRowMutationState> batches =
this.mutationsMap.get(tableRef);
+ if (batches == null) {
+ continue;
+ }
+ for (MultiRowMutationState batch : batches) {
+ // get the first commit batch which doesn't have any mutations
for the table
+ Map<TableRef, MultiRowMutationState> nextCommitBatch =
getNextCommitBatchForTable(commitBatches, tableRef);
+ // add the next mutation batch of the table to the commit batch
+ nextCommitBatch.put(tableRef, batch);
+ }
+ }
+ return commitBatches;
+ }
+
+ // visible for testing
+ List<Map<TableRef, MultiRowMutationState>> createCommitBatches() {
+ return createCommitBatches(this.mutationsMap.keySet().iterator());
+ }
+
+ /**
+ * Return the first commit batch which doesn't have any mutations for the
passed table.
+ * If no such commit batch exists, creates a new commit batch, adds it to
the list of
+ * commit batches and returns it.
+ * @param commitBatchesList current list of commit batches
+ * @param tableRef
+ * @return commit batch
+ */
+ private Map<TableRef, MultiRowMutationState>
getNextCommitBatchForTable(List<Map<TableRef, MultiRowMutationState>>
commitBatchesList,
+ TableRef tableRef) {
+ Map<TableRef, MultiRowMutationState> nextCommitBatch = null;
+ for (Map<TableRef, MultiRowMutationState> commitBatch :
commitBatchesList) {
+ if (commitBatch.get(tableRef) == null) {
+ nextCommitBatch = commitBatch;
+ break;
+ }
+ }
+ if (nextCommitBatch == null) {
+ // create a new commit batch and add it to the list of commit
batches
+ nextCommitBatch =
Maps.newHashMapWithExpectedSize(this.mutationsMap.size());
+ commitBatchesList.add(nextCommitBatch);
+ }
+ return nextCommitBatch;
+ }
+
@SuppressWarnings("deprecation")
private void send(Iterator<TableRef> tableRefIterator) throws SQLException
{
- int i = 0;
- long[] serverTimeStamps = null;
boolean sendAll = false;
+ boolean validateServerTimestamps = false;
+ List<Map<TableRef, MultiRowMutationState>> commitBatches;
if (tableRefIterator == null) {
- serverTimeStamps = validateAll();
- tableRefIterator = mutations.keySet().iterator();
+ commitBatches =
createCommitBatches(this.mutationsMap.keySet().iterator());
sendAll = true;
+ validateServerTimestamps = true;
+ } else {
+ commitBatches = createCommitBatches(tableRefIterator);
+ }
+
+ for (Map<TableRef, MultiRowMutationState> commitBatch : commitBatches)
{
+ long [] serverTimestamps = validateServerTimestamps ?
validateAll(commitBatch) : null;
+ sendBatch(commitBatch, serverTimestamps, sendAll);
}
+ }
- MultiRowMutationState multiRowMutationState;
+ private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch,
long[] serverTimeStamps, boolean sendAll) throws SQLException {
+ int i = 0;
Map<TableInfo, List<Mutation>> physicalTableMutationMap =
Maps.newLinkedHashMap();
// add tracing for this operation
try (TraceScope trace = Tracing.startNewSpan(connection, "Committing
mutations to tables")) {
Span span = trace.getSpan();
ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable();
- while (tableRefIterator.hasNext()) {
+ for (Map.Entry<TableRef, MultiRowMutationState> entry :
commitBatch.entrySet()) {
// at this point we are going through mutations for each table
- final TableRef tableRef = tableRefIterator.next();
- multiRowMutationState = mutations.get(tableRef);
+ final TableRef tableRef = entry.getKey();
+ MultiRowMutationState multiRowMutationState = entry.getValue();
if (multiRowMutationState == null ||
multiRowMutationState.isEmpty()) {
continue;
}
// Validate as we go if transactional since we can undo if a
problem occurs (which is unlikely)
long
- serverTimestamp =
- serverTimeStamps == null ?
- validateAndGetServerTimestamp(tableRef,
multiRowMutationState) :
- serverTimeStamps[i++];
+ serverTimestamp =
+ serverTimeStamps == null ?
+ validateAndGetServerTimestamp(tableRef,
multiRowMutationState) :
+ serverTimeStamps[i++];
final PTable table = tableRef.getTable();
Long scn = connection.getSCN();
long mutationTimestamp = scn == null ?
(table.isTransactional() == true ?
HConstants.LATEST_TIMESTAMP : EnvironmentEdgeManager.currentTimeMillis())
- : scn;
+ : scn;
Iterator<Pair<PTable, List<Mutation>>>
- mutationsIterator =
- addRowMutations(tableRef, multiRowMutationState,
mutationTimestamp,
- serverTimestamp, false, sendAll);
+ mutationsIterator =
+ addRowMutations(tableRef, multiRowMutationState,
mutationTimestamp,
+ serverTimestamp, false, sendAll);
// build map from physical table to mutation list
boolean isDataTable = true;
while (mutationsIterator.hasNext()) {
@@ -1006,10 +1129,10 @@ public class MutationState implements SQLCloseable {
PTable logicalTable = pair.getFirst();
List<Mutation> mutationList = pair.getSecond();
TableInfo tableInfo = new TableInfo(isDataTable,
logicalTable.getPhysicalName()
- , tableRef, logicalTable);
+ , tableRef, logicalTable);
List<Mutation>
- oldMutationList =
- physicalTableMutationMap.put(tableInfo,
mutationList);
+ oldMutationList =
+ physicalTableMutationMap.put(tableInfo, mutationList);
if (oldMutationList != null) mutationList.addAll(0,
oldMutationList);
isDataTable = false;
}
@@ -1020,7 +1143,7 @@ public class MutationState implements SQLCloseable {
if (table.isTransactional()) {
addUncommittedStatementIndexes(multiRowMutationState.values());
if (txMutations.isEmpty()) {
- txMutations =
Maps.newHashMapWithExpectedSize(mutations.size());
+ txMutations =
Maps.newHashMapWithExpectedSize(this.mutationsMap.size());
}
// Keep all mutations we've encountered until a commit or
rollback.
// This is not ideal, but there's not good way to get the
values back
@@ -1187,11 +1310,11 @@ public class MutationState implements SQLCloseable {
numFailedMutations = 0;
// Remove batches as we process them
- mutations.remove(origTableRef);
+ removeMutations(this.mutationsMap, origTableRef);
if (tableInfo.isDataTable()) {
numRows -= numMutations;
// recalculate the estimated size
- estimatedSize =
KeyValueUtil.getEstimatedRowMutationSize(mutations);
+ estimatedSize =
KeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap);
}
} catch (Exception e) {
mutationCommitTime =
EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -1405,8 +1528,10 @@ public class MutationState implements SQLCloseable {
}
private int[] getUncommittedStatementIndexes() {
- for (MultiRowMutationState rowMutationMap : mutations.values()) {
- addUncommittedStatementIndexes(rowMutationMap.values());
+ for (List<MultiRowMutationState> batches : mutationsMap.values()) {
+ for (MultiRowMutationState rowMutationMap : batches) {
+ addUncommittedStatementIndexes(rowMutationMap.values());
+ }
}
return uncommittedStatementIndexes;
}
@@ -1417,7 +1542,7 @@ public class MutationState implements SQLCloseable {
private void resetState() {
numRows = 0;
estimatedSize = 0;
- this.mutations.clear();
+ this.mutationsMap.clear();
phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
}
@@ -1437,7 +1562,7 @@ public class MutationState implements SQLCloseable {
}
public void commit() throws SQLException {
- Map<TableRef, MultiRowMutationState> txMutations =
Collections.emptyMap();
+ Map<TableRef, List<MultiRowMutationState>> txMutations =
Collections.emptyMap();
int retryCount = 0;
do {
boolean sendSuccessful = false;
@@ -1518,7 +1643,7 @@ public class MutationState implements SQLCloseable {
break;
}
retryCount++;
- mutations.putAll(txMutations);
+ mutationsMap.putAll(txMutations);
} while (true);
}
@@ -1576,7 +1701,7 @@ public class MutationState implements SQLCloseable {
* @throws SQLException
*/
public boolean sendUncommitted() throws SQLException {
- return sendUncommitted(mutations.keySet().iterator());
+ return sendUncommitted(mutationsMap.keySet().iterator());
}
/**
@@ -1604,7 +1729,7 @@ public class MutationState implements SQLCloseable {
if (filteredTableRefs.hasNext()) {
// FIXME: strip table alias to prevent equality check from failing
due to alias mismatch on null alias.
// We really should be keying the tables based on the physical
table name.
- List<TableRef> strippedAliases =
Lists.newArrayListWithExpectedSize(mutations.keySet().size());
+ List<TableRef> strippedAliases =
Lists.newArrayListWithExpectedSize(mutationsMap.keySet().size());
while (filteredTableRefs.hasNext()) {
TableRef tableRef = filteredTableRefs.next();
// REVIEW: unclear if we need this given we start transactions
when resolving a table
@@ -1683,6 +1808,10 @@ public class MutationState implements SQLCloseable {
return rowKeyToRowMutationState.put(ptr, rowMutationState);
}
+ public RowMutationState get(ImmutableBytesPtr ptr) {
+ return rowKeyToRowMutationState.get(ptr);
+ }
+
public void putAll(MultiRowMutationState other) {
estimatedSize += other.estimatedSize;
rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
@@ -1747,7 +1876,16 @@ public class MutationState implements SQLCloseable {
return statementIndexes;
}
- void join(RowMutationState newRow) {
+ /**
+ * Join the newRow with the current row if it doesn't conflict with it.
+ * A regular upsert conflicts with a conditional upsert
+ * @param newRow
+ * @return True if the rows were successfully joined else False
+ */
+ boolean join(RowMutationState newRow) {
+ if (isConflicting(newRow)) {
+ return false;
+ }
// If we already have a row and the new row has an ON DUPLICATE
KEY clause
// ignore the new values (as that's what the server will do).
if (newRow.onDupKeyBytes == null) {
@@ -1766,6 +1904,7 @@ public class MutationState implements SQLCloseable {
// increments of the same row in the same commit batch.
this.onDupKeyBytes =
PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
statementIndexes = joinSortedIntArrays(statementIndexes,
newRow.getStatementIndexes());
+ return true;
}
@Nonnull
@@ -1773,6 +1912,10 @@ public class MutationState implements SQLCloseable {
return rowTsColInfo;
}
+ public boolean isConflicting(RowMutationState newRowMutationState) {
+ return (this.onDupKeyBytes != null &&
newRowMutationState.onDupKeyBytes == null ||
+ this.onDupKeyBytes == null &&
newRowMutationState.onDupKeyBytes != null);
+ }
}
public ReadMetricQueue getReadMetricQueue() {
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index f133ad1..d7a408d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -200,19 +200,35 @@ public class KeyValueUtil {
* @param tableMutationMap map from table to row to RowMutationState
* @return estimated row size
*/
- public static long
- getEstimatedRowMutationSize(Map<TableRef, MultiRowMutationState>
tableMutationMap) {
+ public static long getEstimatedRowMutationSize(Map<TableRef,
MultiRowMutationState> tableMutationMap) {
long size = 0;
// iterate over table
for (Entry<TableRef, MultiRowMutationState> tableEntry :
tableMutationMap.entrySet()) {
- // iterate over rows
- for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry :
tableEntry.getValue().entrySet()) {
- size += calculateRowMutationSize(rowEntry);
+ size += calculateMultiRowMutationSize(tableEntry.getValue());
+ }
+ return size;
+ }
+
+ public static long getEstimatedRowMutationSizeWithBatch(Map<TableRef,
List<MultiRowMutationState>> tableMutationMap) {
+ long size = 0;
+ // iterate over table
+ for (Entry<TableRef, List<MultiRowMutationState>> tableEntry :
tableMutationMap.entrySet()) {
+ for (MultiRowMutationState batch : tableEntry.getValue()) {
+ size += calculateMultiRowMutationSize(batch);
}
}
return size;
}
+ private static long calculateMultiRowMutationSize(MultiRowMutationState
mutations) {
+ long size = 0;
+ // iterate over rows
+ for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry :
mutations.entrySet()) {
+ size += calculateRowMutationSize(rowEntry);
+ }
+ return size;
+ }
+
private static long calculateRowMutationSize(Entry<ImmutableBytesPtr,
RowMutationState> rowEntry) {
int rowLength = rowEntry.getKey().getLength();
long colValuesLength = rowEntry.getValue().calculateEstimatedSize();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
index 100dd43..d95faee 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -17,23 +17,7 @@
*/
package org.apache.phoenix.execute;
-import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
@@ -42,8 +26,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PUnsignedInt;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -52,7 +40,26 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import com.google.common.collect.ImmutableList;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
public class MutationStateTest {
@@ -210,4 +217,66 @@ public class MutationStateTest {
+ "( id1 UNSIGNED_INT not null primary key," + "appId1
VARCHAR)");
}
}
+
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ String dataTable1 = generateUniqueName();
+ String dataTable2 = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(String.format(
+ "create table %s (id1 UNSIGNED_INT not null primary key,
appId1 VARCHAR)", dataTable1));
+ conn.createStatement().execute(String.format(
+ "create table %s (id2 UNSIGNED_INT not null primary key,
appId2 VARCHAR)", dataTable2));
+
+ conn.createStatement().execute(String.format(
+ "upsert into %s(id1,appId1) values(111,'app1')", dataTable1));
+ conn.createStatement().execute(String.format(
+ "upsert into %s(id1,appId1) values(111, 'app1') ON DUPLICATE
KEY UPDATE appId1 = null", dataTable1));
+ conn.createStatement().execute(String.format(
+ "upsert into %s(id2,appId2) values(222,'app2')", dataTable2));
+ conn.createStatement().execute(String.format(
+ "upsert into %s(id2,appId2) values(222,'app2') ON DUPLICATE
KEY UPDATE appId2 = null", dataTable2));
+
+ final PhoenixConnection pconn =
conn.unwrap(PhoenixConnection.class);
+ MutationState state = pconn.getMutationState();
+ assertEquals(2, state.getNumRows());
+
+ int actualPairs = 0;
+ Iterator<Pair<byte[], List<Mutation>>> mutations =
state.toMutations();
+ while (mutations.hasNext()) {
+ Pair<byte[], List<Mutation>> nextTable = mutations.next();
+ ++actualPairs;
+ assertEquals(1, nextTable.getSecond().size());
+ }
+ // we have 2 tables and each table has 2 mutation batches
+ // so we should get 4 <table name, [mutations]> pairs
+ assertEquals(4, actualPairs);
+
+ List<Map<TableRef, MultiRowMutationState>> commitBatches =
state.createCommitBatches();
+ assertEquals(2, commitBatches.size());
+ // first commit batch should only contain regular upserts
+ verifyCommitBatch(commitBatches.get(0), false, 2, 1);
+ verifyCommitBatch(commitBatches.get(1), true, 2, 1);
+ }
+ }
+
+ private void verifyCommitBatch(Map<TableRef, MultiRowMutationState>
commitBatch, boolean conditional,
+ int numberOfBatches, int rowsPerBatch) {
+ // one for each table
+ assertEquals(numberOfBatches, commitBatch.size());
+ for (Map.Entry<TableRef, MultiRowMutationState> entry :
commitBatch.entrySet()) {
+ TableRef tableRef = entry.getKey();
+ MultiRowMutationState batch = entry.getValue();
+ assertEquals(rowsPerBatch, batch.size());
+ for (Map.Entry<ImmutableBytesPtr, RowMutationState> row :
batch.entrySet()) {
+ ImmutableBytesPtr key = row.getKey();
+ RowMutationState rowMutationState = row.getValue();
+ if (conditional == true) {
+ assertNotNull(rowMutationState.getOnDupKeyBytes());
+ } else {
+ assertNull(rowMutationState.getOnDupKeyBytes());
+ }
+ }
+ }
+ }
}