This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new f05aadb PHOENIX-6420 Wrong result when conditional and regular upserts are passed in the same commit batch f05aadb is described below commit f05aadbb269dedb8e4dc184cdd41ca4517a9d8e5 Author: Tanuj Khurana <khurana.ta...@gmail.com> AuthorDate: Wed Mar 24 22:02:15 2021 -0700 PHOENIX-6420 Wrong result when conditional and regular upserts are passed in the same commit batch --- .../apache/phoenix/end2end/OnDuplicateKeyIT.java | 43 +++ .../apache/phoenix/execute/PartialCommitIT.java | 2 +- .../org/apache/phoenix/execute/MutationState.java | 334 +++++++++++++++------ .../apache/phoenix/util/PhoenixKeyValueUtil.java | 23 +- .../apache/phoenix/execute/MutationStateTest.java | 106 +++++-- 5 files changed, 392 insertions(+), 116 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java index 3a9e87e..dce5659 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java @@ -614,6 +614,49 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT { conn.close(); } + @Test + public void testOnDupAndUpsertInSameCommitBatch() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 varchar)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + + // row doesn't exist + conn.createStatement().execute(String.format("UPSERT INTO %s VALUES('a',0,'abc')", tableName)); + conn.createStatement().execute(String.format( + "UPSERT INTO %s VALUES('a',1,'zzz') ON DUPLICATE KEY UPDATE counter1 = counter1 + 2", tableName)); + conn.commit(); + assertRow(conn, tableName, "a", 2, "abc"); + + // row exists + conn.createStatement().execute(String.format("UPSERT INTO %s VALUES('a', 7, 'fff')", tableName)); + conn.createStatement().execute(String.format( + "UPSERT INTO %s VALUES('a',1, 'bazz') ON DUPLICATE KEY UPDATE counter1 = counter1 + 2," + + "counter2 = counter2 || 'ddd'", tableName)); + conn.commit(); + assertRow(conn, tableName, "a", 9, "fffddd"); + + // partial update + conn.createStatement().execute(String.format( + "UPSERT INTO %s (pk, counter2) VALUES('a', 'gggg') ON DUPLICATE KEY UPDATE counter1 = counter1 + 2", tableName)); + conn.createStatement().execute(String.format( + "UPSERT INTO %s (pk, counter2) VALUES ('a', 'bar')", tableName)); + conn.commit(); + assertRow(conn, tableName, "a", 11, "bar"); + } + } + + private void assertRow(Connection conn, String tableName, String expectedPK, int expectedCol1, String expectedCol2) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals(expectedPK,rs.getString(1)); + assertEquals(expectedCol1,rs.getInt(2)); + assertEquals(expectedCol2,rs.getString(3)); + assertFalse(rs.next()); + } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index e452da4..9a1a764 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -266,7 +266,7 @@ public class PartialCommitIT extends BaseUniqueNamesOwnClusterIT { private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { try (PhoenixConnection con = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) { - final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator()); + final Map<TableRef, List<MultiRowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState return new PhoenixConnection(con, (MutationState)null) { @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 9a5896c..91c0afc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -138,7 +138,11 @@ public class MutationState implements SQLCloseable { private final long batchSize; private final long batchSizeBytes; private long batchCount = 0L; - private final Map<TableRef, MultiRowMutationState> mutations; + // For each table, maintain a list of mutation batches. Each element in the + // list is a set of row mutations which can be sent in a single commit batch. + // A regular upsert and a conditional upsert on the same row conflict with + // each other so they are split and send separately in different commit batches. + private final Map<TableRef, List<MultiRowMutationState>> mutationsMap; private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); private long sizeOffset; @@ -146,7 +150,7 @@ public class MutationState implements SQLCloseable { private long estimatedSize = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; - private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); + private Map<TableRef, List<MultiRowMutationState>> txMutations = Collections.emptyMap(); private PhoenixTransactionContext phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT; @@ -183,19 +187,19 @@ public class MutationState implements SQLCloseable { private MutationState(int maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) { - this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState> newHashMapWithExpectedSize(5), + this(maxSize, maxSizeBytes, connection, Maps.<TableRef, List<MultiRowMutationState>> newHashMapWithExpectedSize(5), subTask, txContext); this.sizeOffset = sizeOffset; } MutationState(int maxSize, long maxSizeBytes, PhoenixConnection connection, - Map<TableRef, MultiRowMutationState> mutations, boolean subTask, PhoenixTransactionContext txContext) { + Map<TableRef, List<MultiRowMutationState>> mutationsMap, boolean subTask, PhoenixTransactionContext txContext) { this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; this.connection = connection; this.batchSize = connection.getMutateBatchSize(); this.batchSizeBytes = connection.getMutateBatchSizeBytes(); - this.mutations = mutations; + this.mutationsMap = mutationsMap; boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; @@ -213,13 +217,40 @@ public class MutationState implements SQLCloseable { int maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); if (!mutations.isEmpty()) { - this.mutations.put(table, mutations); + addMutations(this.mutationsMap, table, mutations); } this.numRows = mutations.size(); - this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(this.mutations); + this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap); + throwIfTooBig(); } + // add a new batch of row mutations + private void addMutations(Map<TableRef, List<MultiRowMutationState>> mutationMap, TableRef table, + MultiRowMutationState mutations) { + List<MultiRowMutationState> batches = mutationMap.get(table); + if (batches == null) { + batches = Lists.newArrayListWithExpectedSize(1); + } + batches.add(mutations); + mutationMap.put(table, batches); + } + + // remove a batch of mutations which have been committed + private void removeMutations(Map<TableRef, List<MultiRowMutationState>> mutationMap, TableRef table){ + List<MultiRowMutationState> batches = mutationMap.get(table); + if (batches == null || batches.isEmpty()) { + mutationMap.remove(table); + return; + } + + // mutation batches are committed in FIFO order so always remove from the head + batches.remove(0); + if (batches.isEmpty()) { + mutationMap.remove(table); + } + } + public long getEstimatedSize() { return estimatedSize; } @@ -379,7 +410,7 @@ public class MutationState implements SQLCloseable { public static MutationState emptyMutationState(int maxSize, long maxSizeBytes, PhoenixConnection connection) { MutationState state = new MutationState(maxSize, maxSizeBytes, connection, - Collections.<TableRef, MultiRowMutationState> emptyMap(), false, null); + Collections.<TableRef, List<MultiRowMutationState>> emptyMap(), false, null); state.sizeOffset = 0; return state; } @@ -405,67 +436,89 @@ public class MutationState implements SQLCloseable { return numRows; } + private MultiRowMutationState getLastMutationBatch(Map<TableRef, List<MultiRowMutationState>> mutations, TableRef tableRef) { + List<MultiRowMutationState> mutationBatches = mutations.get(tableRef); + if (mutationBatches == null || mutationBatches.isEmpty()) { + return null; + } + return mutationBatches.get(mutationBatches.size() - 1); + } + private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, - Map<TableRef, MultiRowMutationState> dstMutations) { + Map<TableRef, List<MultiRowMutationState>> dstMutations) { PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; - boolean incrementRowCount = dstMutations == this.mutations; - MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows); - if (existingRows != null) { // Rows for that table already exist - // Loop through new rows and replace existing with new - for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : srcRows.entrySet()) { - // Replace existing row with new row - RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); - if (existingRowMutationState != null) { - Map<PColumn, byte[]> existingValues = existingRowMutationState.getColumnValues(); - if (existingValues != PRow.DELETE_MARKER) { - Map<PColumn, byte[]> newRow = rowEntry.getValue().getColumnValues(); - // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with - // existing row. - if (newRow != PRow.DELETE_MARKER) { - // decrement estimated size by the size of the old row - estimatedSize -= existingRowMutationState.calculateEstimatedSize(); - // Merge existing column values with new column values - existingRowMutationState.join(rowEntry.getValue()); - // increment estimated size by the size of the new row - estimatedSize += existingRowMutationState.calculateEstimatedSize(); - // Now that the existing row has been merged with the new row, replace it back - // again (since it was merged with the new one above). - existingRows.put(rowEntry.getKey(), existingRowMutationState); - } - } - } else { - if (incrementRowCount && !isIndex) { // Don't count index rows in row count - numRows++; - // increment estimated size by the size of the new row - estimatedSize += rowEntry.getValue().calculateEstimatedSize(); - } - } - } - // Put the existing one back now that it's merged - dstMutations.put(tableRef, existingRows); - } else { + boolean incrementRowCount = dstMutations == this.mutationsMap; + // we only need to check if the new mutation batch (srcRows) conflicts with the + // last mutation batch since we try to merge it with that only + MultiRowMutationState existingRows = getLastMutationBatch(dstMutations, tableRef); + + if (existingRows == null) { // no rows found for this table // Size new map at batch size as that's what it'll likely grow to. MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize()); newRows.putAll(srcRows); - dstMutations.put(tableRef, newRows); + addMutations(dstMutations, tableRef, newRows); if (incrementRowCount && !isIndex) { numRows += srcRows.size(); // if we added all the rows from newMutationState we can just increment the // estimatedSize by newMutationState.estimatedSize estimatedSize += srcRows.estimatedSize; } + return; + } + + // for conflicting rows + MultiRowMutationState conflictingRows = new MultiRowMutationState(connection.getMutateBatchSize()); + + // Rows for this table already exist, check for conflicts + for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : srcRows.entrySet()) { + ImmutableBytesPtr key = rowEntry.getKey(); + RowMutationState newRowMutationState = rowEntry.getValue(); + RowMutationState existingRowMutationState = existingRows.get(key); + if (existingRowMutationState == null) { + existingRows.put(key, newRowMutationState); + if (incrementRowCount && !isIndex) { // Don't count index rows in row count + numRows++; + // increment estimated size by the size of the new row + estimatedSize += newRowMutationState.calculateEstimatedSize(); + } + continue; + } + Map<PColumn, byte[]> existingValues = existingRowMutationState.getColumnValues(); + Map<PColumn, byte[]> newValues = newRowMutationState.getColumnValues(); + if (existingValues != PRow.DELETE_MARKER && newValues != PRow.DELETE_MARKER) { + // Check if we can merge existing column values with new column values + long beforeMergeSize = existingRowMutationState.calculateEstimatedSize(); + boolean isMerged = existingRowMutationState.join(rowEntry.getValue()); + if (isMerged) { + // decrement estimated size by the size of the old row + estimatedSize -= beforeMergeSize; + // increment estimated size by the size of the new row + estimatedSize += existingRowMutationState.calculateEstimatedSize(); + } else { + // cannot merge regular upsert and conditional upsert + // conflicting row is not a new row so no need to increment numRows + conflictingRows.put(key, newRowMutationState); + } + } else { + existingRows.put(key, newRowMutationState); + } + } + + if (!conflictingRows.isEmpty()) { + addMutations(dstMutations, tableRef, conflictingRows); } } - private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations, - Map<TableRef, MultiRowMutationState> dstMutations) { + private void joinMutationState(Map<TableRef, List<MultiRowMutationState>> srcMutations, + Map<TableRef, List<MultiRowMutationState>> dstMutations) { // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) { - // Replace existing entries for the table with new entries + for (Map.Entry<TableRef, List<MultiRowMutationState>> entry : srcMutations.entrySet()) { TableRef tableRef = entry.getKey(); - MultiRowMutationState srcRows = entry.getValue(); - joinMutationState(tableRef, srcRows, dstMutations); + for (MultiRowMutationState srcRows : entry.getValue()) { + // Replace existing entries for the table with new entries + joinMutationState(tableRef, srcRows, dstMutations); + } } } @@ -484,10 +537,10 @@ public class MutationState implements SQLCloseable { phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; - joinMutationState(newMutationState.mutations, this.mutations); + joinMutationState(newMutationState.mutationsMap, this.mutationsMap); if (!newMutationState.txMutations.isEmpty()) { if (txMutations.isEmpty()) { - txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); + txMutations = Maps.newHashMapWithExpectedSize(this.mutationsMap.size()); } joinMutationState(newMutationState.txMutations, this.txMutations); } @@ -587,10 +640,11 @@ public class MutationState implements SQLCloseable { // the tables in the mutations map if (!sendAll) { TableRef key = new TableRef(index); - MultiRowMutationState multiRowMutationState = mutations.remove(key); + List<MultiRowMutationState> multiRowMutationState = mutationsMap.remove(key); if (multiRowMutationState != null) { final List<Mutation> deleteMutations = Lists.newArrayList(); - generateMutations(key, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); + // for index table there will only be 1 mutation batch in the list + generateMutations(key, mutationTimestamp, serverTimestamp, multiRowMutationState.get(0), deleteMutations, null); if (indexMutations == null) { indexMutations = deleteMutations; } else { @@ -736,19 +790,21 @@ public class MutationState implements SQLCloseable { public Iterator<Pair<byte[], List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { - final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator(); + final Iterator<Map.Entry<TableRef, List<MultiRowMutationState>>> iterator = this.mutationsMap.entrySet().iterator(); if (!iterator.hasNext()) { return Collections.emptyIterator(); } Long scn = connection.getSCN(); final long serverTimestamp = getTableTimestamp(tableTimestamp, scn); final long mutationTimestamp = getMutationTimestamp(scn); return new Iterator<Pair<byte[], List<Mutation>>>() { - private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next(); + private Map.Entry<TableRef, List<MultiRowMutationState>> current = iterator.next(); + private int batchOffset = 0; private Iterator<Pair<byte[], List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[], List<Mutation>>> init() { final Iterator<Pair<PTable, List<Mutation>>> mutationIterator = - addRowMutations(current.getKey(), current.getValue(), - mutationTimestamp, serverTimestamp, includeMutableIndexes, true); + addRowMutations(current.getKey(), current.getValue().get(batchOffset), + mutationTimestamp, serverTimestamp, includeMutableIndexes, true); + return new Iterator<Pair<byte[], List<Mutation>>>() { @Override public boolean hasNext() { @@ -771,13 +827,19 @@ public class MutationState implements SQLCloseable { @Override public boolean hasNext() { - return innerIterator.hasNext() || iterator.hasNext(); + return innerIterator.hasNext() || + batchOffset + 1 < current.getValue().size() || + iterator.hasNext(); } @Override public Pair<byte[], List<Mutation>> next() { if (!innerIterator.hasNext()) { - current = iterator.next(); + ++batchOffset; + if (batchOffset == current.getValue().size()) { + current = iterator.next(); + batchOffset = 0; + } innerIterator = init(); } return innerIterator.next(); @@ -808,10 +870,10 @@ public class MutationState implements SQLCloseable { * @throws SQLException * if the table or any columns no longer exist */ - private long[] validateAll() throws SQLException { + private long[] validateAll(Map<TableRef, MultiRowMutationState> commitBatch) throws SQLException { int i = 0; - long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, MultiRowMutationState> entry : mutations.entrySet()) { + long[] timeStamps = new long[commitBatch.size()]; + for (Map.Entry<TableRef, MultiRowMutationState> entry : commitBatch.entrySet()) { TableRef tableRef = entry.getKey(); timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue()); } @@ -960,44 +1022,108 @@ public class MutationState implements SQLCloseable { } } + + /** + * Split the mutation batches for each table into separate commit batches. + * Each commit batch contains only one mutation batch (MultiRowMutationState) for a table. + * @param tableRefIterator + * @return List of commit batches + */ + private List<Map<TableRef, MultiRowMutationState>> createCommitBatches(Iterator<TableRef> tableRefIterator) { + List<Map<TableRef, MultiRowMutationState>> commitBatches = Lists.newArrayList(); + while (tableRefIterator.hasNext()) { + final TableRef tableRef = tableRefIterator.next(); + List<MultiRowMutationState> batches = this.mutationsMap.get(tableRef); + if (batches == null) { + continue; + } + for (MultiRowMutationState batch : batches) { + // get the first commit batch which doesn't have any mutations for the table + Map<TableRef, MultiRowMutationState> nextCommitBatch = getNextCommitBatchForTable(commitBatches, tableRef); + // add the next mutation batch of the table to the commit batch + nextCommitBatch.put(tableRef, batch); + } + } + return commitBatches; + } + + // visible for testing + List<Map<TableRef, MultiRowMutationState>> createCommitBatches() { + return createCommitBatches(this.mutationsMap.keySet().iterator()); + } + + /** + * Return the first commit batch which doesn't have any mutations for the passed table. + * If no such commit batch exists, creates a new commit batch, adds it to the list of + * commit batches and returns it. + * @param commitBatchesList current list of commit batches + * @param tableRef + * @return commit batch + */ + private Map<TableRef, MultiRowMutationState> getNextCommitBatchForTable(List<Map<TableRef, MultiRowMutationState>> commitBatchesList, + TableRef tableRef) { + Map<TableRef, MultiRowMutationState> nextCommitBatch = null; + for (Map<TableRef, MultiRowMutationState> commitBatch : commitBatchesList) { + if (commitBatch.get(tableRef) == null) { + nextCommitBatch = commitBatch; + break; + } + } + if (nextCommitBatch == null) { + // create a new commit batch and add it to the list of commit batches + nextCommitBatch = Maps.newHashMapWithExpectedSize(this.mutationsMap.size()); + commitBatchesList.add(nextCommitBatch); + } + return nextCommitBatch; + } + private void send(Iterator<TableRef> tableRefIterator) throws SQLException { - int i = 0; - long[] serverTimeStamps = null; boolean sendAll = false; + boolean validateServerTimestamps = false; + List<Map<TableRef, MultiRowMutationState>> commitBatches; if (tableRefIterator == null) { - serverTimeStamps = validateAll(); - tableRefIterator = mutations.keySet().iterator(); + commitBatches = createCommitBatches(this.mutationsMap.keySet().iterator()); sendAll = true; + validateServerTimestamps = true; + } else { + commitBatches = createCommitBatches(tableRefIterator); } - MultiRowMutationState multiRowMutationState; + for (Map<TableRef, MultiRowMutationState> commitBatch : commitBatches) { + long [] serverTimestamps = validateServerTimestamps ? validateAll(commitBatch) : null; + sendBatch(commitBatch, serverTimestamps, sendAll); + } + } + + private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch, long[] serverTimeStamps, boolean sendAll) throws SQLException { + int i = 0; Map<TableInfo, List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap(); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); - while (tableRefIterator.hasNext()) { + for (Map.Entry<TableRef, MultiRowMutationState> entry : commitBatch.entrySet()) { // at this point we are going through mutations for each table - final TableRef tableRef = tableRefIterator.next(); - multiRowMutationState = mutations.get(tableRef); + final TableRef tableRef = entry.getKey(); + MultiRowMutationState multiRowMutationState = entry.getValue(); if (multiRowMutationState == null || multiRowMutationState.isEmpty()) { continue; } // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) long - serverTimestamp = - serverTimeStamps == null ? - validateAndGetServerTimestamp(tableRef, multiRowMutationState) : - serverTimeStamps[i++]; + serverTimestamp = + serverTimeStamps == null ? + validateAndGetServerTimestamp(tableRef, multiRowMutationState) : + serverTimeStamps[i++]; final PTable table = tableRef.getTable(); Long scn = connection.getSCN(); long mutationTimestamp = scn == null ? (table.isTransactional() == true ? HConstants.LATEST_TIMESTAMP : EnvironmentEdgeManager.currentTimeMillis()) - : scn; + : scn; Iterator<Pair<PTable, List<Mutation>>> - mutationsIterator = - addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, - serverTimestamp, false, sendAll); + mutationsIterator = + addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, + serverTimestamp, false, sendAll); // build map from physical table to mutation list boolean isDataTable = true; while (mutationsIterator.hasNext()) { @@ -1009,8 +1135,8 @@ public class MutationState implements SQLCloseable { tableRef, logicalTable); List<Mutation> - oldMutationList = - physicalTableMutationMap.put(tableInfo, mutationList); + oldMutationList = + physicalTableMutationMap.put(tableInfo, mutationList); if (oldMutationList != null) mutationList.addAll(0, oldMutationList); isDataTable = false; } @@ -1021,7 +1147,7 @@ public class MutationState implements SQLCloseable { if (table.isTransactional()) { addUncommittedStatementIndexes(multiRowMutationState.values()); if (txMutations.isEmpty()) { - txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); + txMutations = Maps.newHashMapWithExpectedSize(this.mutationsMap.size()); } // Keep all mutations we've encountered until a commit or rollback. // This is not ideal, but there's not good way to get the values back @@ -1187,11 +1313,11 @@ public class MutationState implements SQLCloseable { numFailedMutations = 0; // Remove batches as we process them - mutations.remove(origTableRef); + removeMutations(this.mutationsMap, origTableRef); if (tableInfo.isDataTable()) { numRows -= numMutations; // recalculate the estimated size - estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(mutations); + estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap); } } catch (Exception e) { mutationCommitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; @@ -1405,8 +1531,10 @@ public class MutationState implements SQLCloseable { } private int[] getUncommittedStatementIndexes() { - for (MultiRowMutationState rowMutationMap : mutations.values()) { - addUncommittedStatementIndexes(rowMutationMap.values()); + for (List<MultiRowMutationState> batches : mutationsMap.values()) { + for (MultiRowMutationState rowMutationMap : batches) { + addUncommittedStatementIndexes(rowMutationMap.values()); + } } return uncommittedStatementIndexes; } @@ -1417,7 +1545,7 @@ public class MutationState implements SQLCloseable { private void resetState() { numRows = 0; estimatedSize = 0; - this.mutations.clear(); + this.mutationsMap.clear(); phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT; } @@ -1437,7 +1565,7 @@ public class MutationState implements SQLCloseable { } public void commit() throws SQLException { - Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); + Map<TableRef, List<MultiRowMutationState>> txMutations = Collections.emptyMap(); int retryCount = 0; do { boolean sendSuccessful = false; @@ -1518,7 +1646,7 @@ public class MutationState implements SQLCloseable { break; } retryCount++; - mutations.putAll(txMutations); + mutationsMap.putAll(txMutations); } while (true); } @@ -1576,7 +1704,7 @@ public class MutationState implements SQLCloseable { * @throws SQLException */ public boolean sendUncommitted() throws SQLException { - return sendUncommitted(mutations.keySet().iterator()); + return sendUncommitted(mutationsMap.keySet().iterator()); } /** @@ -1604,7 +1732,7 @@ public class MutationState implements SQLCloseable { if (filteredTableRefs.hasNext()) { // FIXME: strip table alias to prevent equality check from failing due to alias mismatch on null alias. // We really should be keying the tables based on the physical table name. - List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size()); + List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutationsMap.keySet().size()); while (filteredTableRefs.hasNext()) { TableRef tableRef = filteredTableRefs.next(); // REVIEW: unclear if we need this given we start transactions when resolving a table @@ -1683,6 +1811,10 @@ public class MutationState implements SQLCloseable { return rowKeyToRowMutationState.put(ptr, rowMutationState); } + public RowMutationState get(ImmutableBytesPtr ptr) { + return rowKeyToRowMutationState.get(ptr); + } + public void putAll(MultiRowMutationState other) { estimatedSize += other.estimatedSize; rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState); @@ -1747,7 +1879,16 @@ public class MutationState implements SQLCloseable { return statementIndexes; } - void join(RowMutationState newRow) { + /** + * Join the newRow with the current row if it doesn't conflict with it. + * A regular upsert conflicts with a conditional upsert + * @param newRow + * @return True if the rows were successfully joined else False + */ + boolean join(RowMutationState newRow) { + if (isConflicting(newRow)) { + return false; + } // If we already have a row and the new row has an ON DUPLICATE KEY clause // ignore the new values (as that's what the server will do). if (newRow.onDupKeyBytes == null) { @@ -1766,6 +1907,7 @@ public class MutationState implements SQLCloseable { // increments of the same row in the same commit batch. this.onDupKeyBytes = PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes); statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); + return true; } @Nonnull @@ -1773,6 +1915,10 @@ public class MutationState implements SQLCloseable { return rowTsColInfo; } + public boolean isConflicting(RowMutationState newRowMutationState) { + return (this.onDupKeyBytes != null && newRowMutationState.onDupKeyBytes == null || + this.onDupKeyBytes == null && newRowMutationState.onDupKeyBytes != null); + } } public ReadMetricQueue getReadMetricQueue() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java index c157873..0af2148 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -196,9 +196,17 @@ public class PhoenixKeyValueUtil { long size = 0; // iterate over table for (Entry<TableRef, MultiRowMutationState> tableEntry : tableMutationMap.entrySet()) { - // iterate over rows - for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue().entrySet()) { - size += calculateRowMutationSize(rowEntry); + size += calculateMultiRowMutationSize(tableEntry.getValue()); + } + return size; + } + + public static long getEstimatedRowMutationSizeWithBatch(Map<TableRef, List<MultiRowMutationState>> tableMutationMap) { + long size = 0; + // iterate over table + for (Entry<TableRef, List<MultiRowMutationState>> tableEntry : tableMutationMap.entrySet()) { + for (MultiRowMutationState batch : tableEntry.getValue()) { + size += calculateMultiRowMutationSize(batch); } } return size; @@ -214,6 +222,15 @@ public class PhoenixKeyValueUtil { return KeyValueUtil.copyToNewKeyValue(c); } + private static long calculateMultiRowMutationSize(MultiRowMutationState mutations) { + long size = 0; + // iterate over rows + for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) { + size += calculateRowMutationSize(rowEntry); + } + return size; + } + private static long calculateRowMutationSize(Entry<ImmutableBytesPtr, RowMutationState> rowEntry) { int rowLength = rowEntry.getKey().getLength(); long colValuesLength = rowEntry.getValue().calculateEstimatedSize(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java index f39312f..0a910e2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java @@ -17,23 +17,6 @@ */ package org.apache.phoenix.execute; -import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Delete; @@ -42,17 +25,42 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; +import org.apache.phoenix.execute.MutationState.RowMutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PUnsignedInt; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays; +import static org.apache.phoenix.query.BaseTest.generateUniqueName; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + public class MutationStateTest { @@ -211,4 +219,66 @@ public class MutationStateTest { } } + + @Test + public void testOnDupAndUpsertInSameCommitBatch() throws Exception { + String dataTable1 = generateUniqueName(); + String dataTable2 = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(String.format( + "create table %s (id1 UNSIGNED_INT not null primary key, appId1 VARCHAR)", dataTable1)); + conn.createStatement().execute(String.format( + "create table %s (id2 UNSIGNED_INT not null primary key, appId2 VARCHAR)", dataTable2)); + + conn.createStatement().execute(String.format( + "upsert into %s(id1,appId1) values(111,'app1')", dataTable1)); + conn.createStatement().execute(String.format( + "upsert into %s(id1,appId1) values(111, 'app1') ON DUPLICATE KEY UPDATE appId1 = null", dataTable1)); + conn.createStatement().execute(String.format( + "upsert into %s(id2,appId2) values(222,'app2')", dataTable2)); + conn.createStatement().execute(String.format( + "upsert into %s(id2,appId2) values(222,'app2') ON DUPLICATE KEY UPDATE appId2 = null", dataTable2)); + + final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + MutationState state = pconn.getMutationState(); + assertEquals(2, state.getNumRows()); + + int actualPairs = 0; + Iterator<Pair<byte[], List<Mutation>>> mutations = state.toMutations(); + while (mutations.hasNext()) { + Pair<byte[], List<Mutation>> nextTable = mutations.next(); + ++actualPairs; + assertEquals(1, nextTable.getSecond().size()); + } + // we have 2 tables and each table has 2 mutation batches + // so we should get 4 <table name, [mutations]> pairs + assertEquals(4, actualPairs); + + List<Map<TableRef, MultiRowMutationState>> commitBatches = state.createCommitBatches(); + assertEquals(2, commitBatches.size()); + // first commit batch should only contain regular upserts + verifyCommitBatch(commitBatches.get(0), false, 2, 1); + verifyCommitBatch(commitBatches.get(1), true, 2, 1); + } + } + + private void verifyCommitBatch(Map<TableRef, MultiRowMutationState> commitBatch, boolean conditional, + int numberOfBatches, int rowsPerBatch) { + // one for each table + assertEquals(numberOfBatches, commitBatch.size()); + for (Map.Entry<TableRef, MultiRowMutationState> entry : commitBatch.entrySet()) { + TableRef tableRef = entry.getKey(); + MultiRowMutationState batch = entry.getValue(); + assertEquals(rowsPerBatch, batch.size()); + for (Map.Entry<ImmutableBytesPtr, RowMutationState> row : batch.entrySet()) { + ImmutableBytesPtr key = row.getKey(); + RowMutationState rowMutationState = row.getValue(); + if (conditional == true) { + assertNotNull(rowMutationState.getOnDupKeyBytes()); + } else { + assertNull(rowMutationState.getOnDupKeyBytes()); + } + } + } + } }