This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new f05aadb  PHOENIX-6420 Wrong result when conditional and regular 
upserts are passed in the same commit batch
f05aadb is described below

commit f05aadbb269dedb8e4dc184cdd41ca4517a9d8e5
Author: Tanuj Khurana <khurana.ta...@gmail.com>
AuthorDate: Wed Mar 24 22:02:15 2021 -0700

    PHOENIX-6420 Wrong result when conditional and regular upserts are passed 
in the same commit batch
---
 .../apache/phoenix/end2end/OnDuplicateKeyIT.java   |  43 +++
 .../apache/phoenix/execute/PartialCommitIT.java    |   2 +-
 .../org/apache/phoenix/execute/MutationState.java  | 334 +++++++++++++++------
 .../apache/phoenix/util/PhoenixKeyValueUtil.java   |  23 +-
 .../apache/phoenix/execute/MutationStateTest.java  | 106 +++++--
 5 files changed, 392 insertions(+), 116 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index 3a9e87e..dce5659 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -614,6 +614,49 @@ public class OnDuplicateKeyIT extends 
ParallelStatsDisabledIT {
         conn.close();
     }
 
+    @Test
+    public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName + "(pk varchar primary 
key, counter1 bigint, counter2 varchar)";
+            conn.createStatement().execute(ddl);
+            createIndex(conn, tableName);
+
+            // row doesn't exist
+            conn.createStatement().execute(String.format("UPSERT INTO %s 
VALUES('a',0,'abc')", tableName));
+            conn.createStatement().execute(String.format(
+                "UPSERT INTO %s VALUES('a',1,'zzz') ON DUPLICATE KEY UPDATE 
counter1 = counter1 + 2", tableName));
+            conn.commit();
+            assertRow(conn, tableName, "a", 2, "abc");
+
+            // row exists
+            conn.createStatement().execute(String.format("UPSERT INTO %s 
VALUES('a', 7, 'fff')", tableName));
+            conn.createStatement().execute(String.format(
+                "UPSERT INTO %s VALUES('a',1, 'bazz') ON DUPLICATE KEY UPDATE 
counter1 = counter1 + 2," +
+                    "counter2 = counter2 || 'ddd'", tableName));
+            conn.commit();
+            assertRow(conn, tableName, "a", 9, "fffddd");
+
+            // partial update
+            conn.createStatement().execute(String.format(
+                "UPSERT INTO %s (pk, counter2) VALUES('a', 'gggg') ON 
DUPLICATE KEY UPDATE counter1 = counter1 + 2", tableName));
+            conn.createStatement().execute(String.format(
+                "UPSERT INTO %s (pk, counter2) VALUES ('a', 'bar')", 
tableName));
+            conn.commit();
+            assertRow(conn, tableName, "a", 11, "bar");
+        }
+    }
+
+    private void assertRow(Connection conn, String tableName, String 
expectedPK, int expectedCol1, String expectedCol2) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + 
tableName);
+        assertTrue(rs.next());
+        assertEquals(expectedPK,rs.getString(1));
+        assertEquals(expectedCol1,rs.getInt(2));
+        assertEquals(expectedCol2,rs.getString(3));
+        assertFalse(rs.next());
+    }
+
 
 }
     
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index e452da4..9a1a764 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -266,7 +266,7 @@ public class PartialCommitIT extends 
BaseUniqueNamesOwnClusterIT {
     
     private PhoenixConnection 
getConnectionWithTableOrderPreservingMutationState() throws SQLException {
         try (PhoenixConnection con = 
DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) {
-            final Map<TableRef, MultiRowMutationState> mutations = 
Maps.newTreeMap(new TableRefComparator());
+            final Map<TableRef, List<MultiRowMutationState>> mutations = 
Maps.newTreeMap(new TableRefComparator());
             // passing a null mutation state forces the 
connection.newMutationState() to be used to create the MutationState
             return new PhoenixConnection(con, (MutationState)null) {
                 @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 9a5896c..91c0afc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -138,7 +138,11 @@ public class MutationState implements SQLCloseable {
     private final long batchSize;
     private final long batchSizeBytes;
     private long batchCount = 0L;
-    private final Map<TableRef, MultiRowMutationState> mutations;
+    // For each table, maintain a list of mutation batches. Each element in the
+    // list is a set of row mutations which can be sent in a single commit 
batch.
+    // A regular upsert and a conditional upsert on the same row conflict with
+    // each other so they are split and send separately in different commit 
batches.
+    private final Map<TableRef, List<MultiRowMutationState>> mutationsMap;
     private final Set<String> uncommittedPhysicalNames = 
Sets.newHashSetWithExpectedSize(10);
 
     private long sizeOffset;
@@ -146,7 +150,7 @@ public class MutationState implements SQLCloseable {
     private long estimatedSize = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
-    private Map<TableRef, MultiRowMutationState> txMutations = 
Collections.emptyMap();
+    private Map<TableRef, List<MultiRowMutationState>> txMutations = 
Collections.emptyMap();
 
     private PhoenixTransactionContext phoenixTransactionContext = 
PhoenixTransactionContext.NULL_CONTEXT;
 
@@ -183,19 +187,19 @@ public class MutationState implements SQLCloseable {
 
     private MutationState(int maxSize, long maxSizeBytes, PhoenixConnection 
connection,
             boolean subTask, PhoenixTransactionContext txContext, long 
sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, 
MultiRowMutationState> newHashMapWithExpectedSize(5),
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, 
List<MultiRowMutationState>> newHashMapWithExpectedSize(5),
                 subTask, txContext);
         this.sizeOffset = sizeOffset;
     }
 
     MutationState(int maxSize, long maxSizeBytes, PhoenixConnection connection,
-            Map<TableRef, MultiRowMutationState> mutations, boolean subTask, 
PhoenixTransactionContext txContext) {
+            Map<TableRef, List<MultiRowMutationState>> mutationsMap, boolean 
subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.maxSizeBytes = maxSizeBytes;
         this.connection = connection;
         this.batchSize = connection.getMutateBatchSize();
         this.batchSizeBytes = connection.getMutateBatchSizeBytes();
-        this.mutations = mutations;
+        this.mutationsMap = mutationsMap;
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
@@ -213,13 +217,40 @@ public class MutationState implements SQLCloseable {
            int maxSize, long maxSizeBytes, PhoenixConnection connection) 
throws SQLException {
         this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
         if (!mutations.isEmpty()) {
-            this.mutations.put(table, mutations);
+            addMutations(this.mutationsMap, table, mutations);
         }
         this.numRows = mutations.size();
-        this.estimatedSize = 
PhoenixKeyValueUtil.getEstimatedRowMutationSize(this.mutations);
+        this.estimatedSize = 
PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap);
+
         throwIfTooBig();
     }
 
+    // add a new batch of row mutations
+    private void addMutations(Map<TableRef, List<MultiRowMutationState>> 
mutationMap, TableRef table,
+            MultiRowMutationState mutations) {
+        List<MultiRowMutationState> batches = mutationMap.get(table);
+        if (batches == null) {
+            batches = Lists.newArrayListWithExpectedSize(1);
+        }
+        batches.add(mutations);
+        mutationMap.put(table, batches);
+    }
+
+    // remove a batch of mutations which have been committed
+    private void removeMutations(Map<TableRef, List<MultiRowMutationState>> 
mutationMap, TableRef table){
+        List<MultiRowMutationState> batches = mutationMap.get(table);
+        if (batches == null || batches.isEmpty()) {
+            mutationMap.remove(table);
+            return;
+        }
+
+        // mutation batches are committed in FIFO order so always remove from 
the head
+        batches.remove(0);
+        if (batches.isEmpty()) {
+            mutationMap.remove(table);
+        }
+    }
+
     public long getEstimatedSize() {
         return estimatedSize;
     }
@@ -379,7 +410,7 @@ public class MutationState implements SQLCloseable {
     public static MutationState emptyMutationState(int maxSize, long 
maxSizeBytes,
                   PhoenixConnection connection) {
         MutationState state = new MutationState(maxSize, maxSizeBytes, 
connection,
-                Collections.<TableRef, MultiRowMutationState> emptyMap(), 
false, null);
+                Collections.<TableRef, List<MultiRowMutationState>> 
emptyMap(), false, null);
         state.sizeOffset = 0;
         return state;
     }
@@ -405,67 +436,89 @@ public class MutationState implements SQLCloseable {
         return numRows;
     }
 
+    private MultiRowMutationState getLastMutationBatch(Map<TableRef, 
List<MultiRowMutationState>> mutations, TableRef tableRef) {
+        List<MultiRowMutationState> mutationBatches = mutations.get(tableRef);
+        if (mutationBatches == null || mutationBatches.isEmpty()) {
+            return null;
+        }
+        return mutationBatches.get(mutationBatches.size() - 1);
+    }
+
     private void joinMutationState(TableRef tableRef, MultiRowMutationState 
srcRows,
-            Map<TableRef, MultiRowMutationState> dstMutations) {
+        Map<TableRef, List<MultiRowMutationState>> dstMutations) {
         PTable table = tableRef.getTable();
         boolean isIndex = table.getType() == PTableType.INDEX;
-        boolean incrementRowCount = dstMutations == this.mutations;
-        MultiRowMutationState existingRows = dstMutations.put(tableRef, 
srcRows);
-        if (existingRows != null) { // Rows for that table already exist
-            // Loop through new rows and replace existing with new
-            for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
srcRows.entrySet()) {
-                // Replace existing row with new row
-                RowMutationState existingRowMutationState = 
existingRows.put(rowEntry.getKey(), rowEntry.getValue());
-                if (existingRowMutationState != null) {
-                    Map<PColumn, byte[]> existingValues = 
existingRowMutationState.getColumnValues();
-                    if (existingValues != PRow.DELETE_MARKER) {
-                        Map<PColumn, byte[]> newRow = 
rowEntry.getValue().getColumnValues();
-                        // if new row is PRow.DELETE_MARKER, it means delete, 
and we don't need to merge it with
-                        // existing row.
-                        if (newRow != PRow.DELETE_MARKER) {
-                            // decrement estimated size by the size of the old 
row
-                            estimatedSize -= 
existingRowMutationState.calculateEstimatedSize();
-                            // Merge existing column values with new column 
values
-                            existingRowMutationState.join(rowEntry.getValue());
-                            // increment estimated size by the size of the new 
row
-                            estimatedSize += 
existingRowMutationState.calculateEstimatedSize();
-                            // Now that the existing row has been merged with 
the new row, replace it back
-                            // again (since it was merged with the new one 
above).
-                            existingRows.put(rowEntry.getKey(), 
existingRowMutationState);
-                        }
-                    }
-                } else {
-                    if (incrementRowCount && !isIndex) { // Don't count index 
rows in row count
-                        numRows++;
-                        // increment estimated size by the size of the new row
-                        estimatedSize += 
rowEntry.getValue().calculateEstimatedSize();
-                    }
-                }
-            }
-            // Put the existing one back now that it's merged
-            dstMutations.put(tableRef, existingRows);
-        } else {
+        boolean incrementRowCount = dstMutations == this.mutationsMap;
+        // we only need to check if the new mutation batch (srcRows) conflicts 
with the
+        // last mutation batch since we try to merge it with that only
+        MultiRowMutationState existingRows = 
getLastMutationBatch(dstMutations, tableRef);
+
+        if (existingRows == null) { // no rows found for this table
             // Size new map at batch size as that's what it'll likely grow to.
             MultiRowMutationState newRows = new 
MultiRowMutationState(connection.getMutateBatchSize());
             newRows.putAll(srcRows);
-            dstMutations.put(tableRef, newRows);
+            addMutations(dstMutations, tableRef, newRows);
             if (incrementRowCount && !isIndex) {
                 numRows += srcRows.size();
                 // if we added all the rows from newMutationState we can just 
increment the
                 // estimatedSize by newMutationState.estimatedSize
                 estimatedSize += srcRows.estimatedSize;
             }
+            return;
+        }
+
+        // for conflicting rows
+        MultiRowMutationState conflictingRows = new 
MultiRowMutationState(connection.getMutateBatchSize());
+
+        // Rows for this table already exist, check for conflicts
+        for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
srcRows.entrySet()) {
+            ImmutableBytesPtr key = rowEntry.getKey();
+            RowMutationState newRowMutationState = rowEntry.getValue();
+            RowMutationState existingRowMutationState = existingRows.get(key);
+            if (existingRowMutationState == null) {
+                existingRows.put(key, newRowMutationState);
+                if (incrementRowCount && !isIndex) { // Don't count index rows 
in row count
+                    numRows++;
+                    // increment estimated size by the size of the new row
+                    estimatedSize += 
newRowMutationState.calculateEstimatedSize();
+                }
+                continue;
+            }
+            Map<PColumn, byte[]> existingValues = 
existingRowMutationState.getColumnValues();
+            Map<PColumn, byte[]> newValues = 
newRowMutationState.getColumnValues();
+            if (existingValues != PRow.DELETE_MARKER && newValues != 
PRow.DELETE_MARKER) {
+                // Check if we can merge existing column values with new 
column values
+                long beforeMergeSize = 
existingRowMutationState.calculateEstimatedSize();
+                boolean isMerged = 
existingRowMutationState.join(rowEntry.getValue());
+                if (isMerged) {
+                    // decrement estimated size by the size of the old row
+                    estimatedSize -= beforeMergeSize;
+                    // increment estimated size by the size of the new row
+                    estimatedSize += 
existingRowMutationState.calculateEstimatedSize();
+                } else {
+                    // cannot merge regular upsert and conditional upsert
+                    // conflicting row is not a new row so no need to 
increment numRows
+                    conflictingRows.put(key, newRowMutationState);
+                }
+            } else {
+                existingRows.put(key, newRowMutationState);
+            }
+        }
+
+        if (!conflictingRows.isEmpty()) {
+            addMutations(dstMutations, tableRef, conflictingRows);
         }
     }
 
-    private void joinMutationState(Map<TableRef, MultiRowMutationState> 
srcMutations,
-            Map<TableRef, MultiRowMutationState> dstMutations) {
+    private void joinMutationState(Map<TableRef, List<MultiRowMutationState>> 
srcMutations,
+            Map<TableRef, List<MultiRowMutationState>> dstMutations) {
         // Merge newMutation with this one, keeping state from newMutation for 
any overlaps
-        for (Map.Entry<TableRef, MultiRowMutationState> entry : 
srcMutations.entrySet()) {
-            // Replace existing entries for the table with new entries
+        for (Map.Entry<TableRef, List<MultiRowMutationState>> entry : 
srcMutations.entrySet()) {
             TableRef tableRef = entry.getKey();
-            MultiRowMutationState srcRows = entry.getValue();
-            joinMutationState(tableRef, srcRows, dstMutations);
+            for (MultiRowMutationState srcRows : entry.getValue()) {
+                // Replace existing entries for the table with new entries
+                joinMutationState(tableRef, srcRows, dstMutations);
+            }
         }
     }
 
@@ -484,10 +537,10 @@ public class MutationState implements SQLCloseable {
         
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
 
         this.sizeOffset += newMutationState.sizeOffset;
-        joinMutationState(newMutationState.mutations, this.mutations);
+        joinMutationState(newMutationState.mutationsMap, this.mutationsMap);
         if (!newMutationState.txMutations.isEmpty()) {
             if (txMutations.isEmpty()) {
-                txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
+                txMutations = 
Maps.newHashMapWithExpectedSize(this.mutationsMap.size());
             }
             joinMutationState(newMutationState.txMutations, this.txMutations);
         }
@@ -587,10 +640,11 @@ public class MutationState implements SQLCloseable {
                     // the tables in the mutations map
                     if (!sendAll) {
                         TableRef key = new TableRef(index);
-                        MultiRowMutationState multiRowMutationState = 
mutations.remove(key);
+                        List<MultiRowMutationState> multiRowMutationState = 
mutationsMap.remove(key);
                         if (multiRowMutationState != null) {
                             final List<Mutation> deleteMutations = 
Lists.newArrayList();
-                            generateMutations(key, mutationTimestamp, 
serverTimestamp, multiRowMutationState, deleteMutations, null);
+                            // for index table there will only be 1 mutation 
batch in the list
+                            generateMutations(key, mutationTimestamp, 
serverTimestamp, multiRowMutationState.get(0), deleteMutations, null);
                             if (indexMutations == null) {
                                 indexMutations = deleteMutations;
                             } else {
@@ -736,19 +790,21 @@ public class MutationState implements SQLCloseable {
 
     public Iterator<Pair<byte[], List<Mutation>>> toMutations(final boolean 
includeMutableIndexes,
             final Long tableTimestamp) {
-        final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = 
this.mutations.entrySet().iterator();
+        final Iterator<Map.Entry<TableRef, List<MultiRowMutationState>>> 
iterator = this.mutationsMap.entrySet().iterator();
         if (!iterator.hasNext()) { return Collections.emptyIterator(); }
         Long scn = connection.getSCN();
         final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
         final long mutationTimestamp = getMutationTimestamp(scn);
         return new Iterator<Pair<byte[], List<Mutation>>>() {
-            private Map.Entry<TableRef, MultiRowMutationState> current = 
iterator.next();
+            private Map.Entry<TableRef, List<MultiRowMutationState>> current = 
iterator.next();
+            private int batchOffset = 0;
             private Iterator<Pair<byte[], List<Mutation>>> innerIterator = 
init();
 
             private Iterator<Pair<byte[], List<Mutation>>> init() {
                 final Iterator<Pair<PTable, List<Mutation>>> mutationIterator =
-                        addRowMutations(current.getKey(), current.getValue(),
-                                mutationTimestamp, serverTimestamp, 
includeMutableIndexes, true);
+                    addRowMutations(current.getKey(), 
current.getValue().get(batchOffset),
+                        mutationTimestamp, serverTimestamp, 
includeMutableIndexes, true);
+
                 return new Iterator<Pair<byte[], List<Mutation>>>() {
                     @Override
                     public boolean hasNext() {
@@ -771,13 +827,19 @@ public class MutationState implements SQLCloseable {
 
             @Override
             public boolean hasNext() {
-                return innerIterator.hasNext() || iterator.hasNext();
+                return innerIterator.hasNext() ||
+                    batchOffset + 1 < current.getValue().size() ||
+                    iterator.hasNext();
             }
 
             @Override
             public Pair<byte[], List<Mutation>> next() {
                 if (!innerIterator.hasNext()) {
-                    current = iterator.next();
+                    ++batchOffset;
+                    if (batchOffset == current.getValue().size()) {
+                        current = iterator.next();
+                        batchOffset = 0;
+                    }
                     innerIterator = init();
                 }
                 return innerIterator.next();
@@ -808,10 +870,10 @@ public class MutationState implements SQLCloseable {
      * @throws SQLException
      *             if the table or any columns no longer exist
      */
-    private long[] validateAll() throws SQLException {
+    private long[] validateAll(Map<TableRef, MultiRowMutationState> 
commitBatch) throws SQLException {
         int i = 0;
-        long[] timeStamps = new long[this.mutations.size()];
-        for (Map.Entry<TableRef, MultiRowMutationState> entry : 
mutations.entrySet()) {
+        long[] timeStamps = new long[commitBatch.size()];
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : 
commitBatch.entrySet()) {
             TableRef tableRef = entry.getKey();
             timeStamps[i++] = validateAndGetServerTimestamp(tableRef, 
entry.getValue());
         }
@@ -960,44 +1022,108 @@ public class MutationState implements SQLCloseable {
         }
 
     }
+
+    /**
+     * Split the mutation batches for each table into separate commit batches.
+     * Each commit batch contains only one mutation batch 
(MultiRowMutationState) for a table.
+     * @param tableRefIterator
+     * @return List of commit batches
+     */
+    private List<Map<TableRef, MultiRowMutationState>> 
createCommitBatches(Iterator<TableRef> tableRefIterator) {
+        List<Map<TableRef, MultiRowMutationState>> commitBatches = 
Lists.newArrayList();
+        while (tableRefIterator.hasNext()) {
+            final TableRef tableRef = tableRefIterator.next();
+            List<MultiRowMutationState> batches = 
this.mutationsMap.get(tableRef);
+            if (batches == null) {
+                continue;
+            }
+            for (MultiRowMutationState batch : batches) {
+                // get the first commit batch which doesn't have any mutations 
for the table
+                Map<TableRef, MultiRowMutationState> nextCommitBatch = 
getNextCommitBatchForTable(commitBatches, tableRef);
+                // add the next mutation batch of the table to the commit batch
+                nextCommitBatch.put(tableRef, batch);
+            }
+        }
+        return commitBatches;
+    }
+
+    // visible for testing
+    List<Map<TableRef, MultiRowMutationState>> createCommitBatches() {
+        return createCommitBatches(this.mutationsMap.keySet().iterator());
+    }
+
+    /**
+     * Return the first commit batch which doesn't have any mutations for the 
passed table.
+     * If no such commit batch exists, creates a new commit batch, adds it to 
the list of
+     * commit batches and returns it.
+     * @param commitBatchesList current list of commit batches
+     * @param tableRef
+     * @return commit batch
+     */
+    private Map<TableRef, MultiRowMutationState> 
getNextCommitBatchForTable(List<Map<TableRef, MultiRowMutationState>> 
commitBatchesList,
+        TableRef tableRef) {
+        Map<TableRef, MultiRowMutationState> nextCommitBatch = null;
+        for (Map<TableRef, MultiRowMutationState> commitBatch : 
commitBatchesList) {
+            if (commitBatch.get(tableRef) == null) {
+                nextCommitBatch = commitBatch;
+                break;
+            }
+        }
+        if (nextCommitBatch == null) {
+            // create a new commit batch and add it to the list of commit 
batches
+            nextCommitBatch = 
Maps.newHashMapWithExpectedSize(this.mutationsMap.size());
+            commitBatchesList.add(nextCommitBatch);
+        }
+        return nextCommitBatch;
+    }
+
     private void send(Iterator<TableRef> tableRefIterator) throws SQLException 
{
-        int i = 0;
-        long[] serverTimeStamps = null;
         boolean sendAll = false;
+        boolean validateServerTimestamps = false;
+        List<Map<TableRef, MultiRowMutationState>> commitBatches;
         if (tableRefIterator == null) {
-            serverTimeStamps = validateAll();
-            tableRefIterator = mutations.keySet().iterator();
+            commitBatches = 
createCommitBatches(this.mutationsMap.keySet().iterator());
             sendAll = true;
+            validateServerTimestamps = true;
+        } else {
+            commitBatches = createCommitBatches(tableRefIterator);
         }
 
-        MultiRowMutationState multiRowMutationState;
+        for (Map<TableRef, MultiRowMutationState> commitBatch : commitBatches) 
{
+            long [] serverTimestamps = validateServerTimestamps ? 
validateAll(commitBatch) : null;
+            sendBatch(commitBatch, serverTimestamps, sendAll);
+        }
+    }
+
+    private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch, 
long[] serverTimeStamps, boolean sendAll) throws SQLException {
+        int i = 0;
         Map<TableInfo, List<Mutation>> physicalTableMutationMap = 
Maps.newLinkedHashMap();
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables")) {
             Span span = trace.getSpan();
             ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
-            while (tableRefIterator.hasNext()) {
+            for (Map.Entry<TableRef, MultiRowMutationState> entry : 
commitBatch.entrySet()) {
                 // at this point we are going through mutations for each table
-                final TableRef tableRef = tableRefIterator.next();
-                multiRowMutationState = mutations.get(tableRef);
+                final TableRef tableRef = entry.getKey();
+                MultiRowMutationState multiRowMutationState = entry.getValue();
                 if (multiRowMutationState == null || 
multiRowMutationState.isEmpty()) {
                     continue;
                 }
                 // Validate as we go if transactional since we can undo if a 
problem occurs (which is unlikely)
                 long
-                        serverTimestamp =
-                        serverTimeStamps == null ?
-                                validateAndGetServerTimestamp(tableRef, 
multiRowMutationState) :
-                                serverTimeStamps[i++];
+                    serverTimestamp =
+                    serverTimeStamps == null ?
+                        validateAndGetServerTimestamp(tableRef, 
multiRowMutationState) :
+                        serverTimeStamps[i++];
                 final PTable table = tableRef.getTable();
                 Long scn = connection.getSCN();
                 long mutationTimestamp = scn == null ?
                     (table.isTransactional() == true ? 
HConstants.LATEST_TIMESTAMP : EnvironmentEdgeManager.currentTimeMillis())
-                        : scn;
+                    : scn;
                 Iterator<Pair<PTable, List<Mutation>>>
-                        mutationsIterator =
-                        addRowMutations(tableRef, multiRowMutationState, 
mutationTimestamp,
-                                serverTimestamp, false, sendAll);
+                    mutationsIterator =
+                    addRowMutations(tableRef, multiRowMutationState, 
mutationTimestamp,
+                        serverTimestamp, false, sendAll);
                 // build map from physical table to mutation list
                 boolean isDataTable = true;
                 while (mutationsIterator.hasNext()) {
@@ -1009,8 +1135,8 @@ public class MutationState implements SQLCloseable {
                             tableRef, logicalTable);
 
                     List<Mutation>
-                            oldMutationList =
-                            physicalTableMutationMap.put(tableInfo, 
mutationList);
+                        oldMutationList =
+                        physicalTableMutationMap.put(tableInfo, mutationList);
                     if (oldMutationList != null) mutationList.addAll(0, 
oldMutationList);
                     isDataTable = false;
                 }
@@ -1021,7 +1147,7 @@ public class MutationState implements SQLCloseable {
                 if (table.isTransactional()) {
                     
addUncommittedStatementIndexes(multiRowMutationState.values());
                     if (txMutations.isEmpty()) {
-                        txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
+                        txMutations = 
Maps.newHashMapWithExpectedSize(this.mutationsMap.size());
                     }
                     // Keep all mutations we've encountered until a commit or 
rollback.
                     // This is not ideal, but there's not good way to get the 
values back
@@ -1187,11 +1313,11 @@ public class MutationState implements SQLCloseable {
                     numFailedMutations = 0;
 
                     // Remove batches as we process them
-                    mutations.remove(origTableRef);
+                    removeMutations(this.mutationsMap, origTableRef);
                     if (tableInfo.isDataTable()) {
                         numRows -= numMutations;
                         // recalculate the estimated size
-                        estimatedSize = 
PhoenixKeyValueUtil.getEstimatedRowMutationSize(mutations);
+                        estimatedSize = 
PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap);
                     }
                 } catch (Exception e) {
                     mutationCommitTime = 
EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -1405,8 +1531,10 @@ public class MutationState implements SQLCloseable {
     }
 
     private int[] getUncommittedStatementIndexes() {
-        for (MultiRowMutationState rowMutationMap : mutations.values()) {
-            addUncommittedStatementIndexes(rowMutationMap.values());
+        for (List<MultiRowMutationState> batches : mutationsMap.values()) {
+            for (MultiRowMutationState rowMutationMap : batches) {
+                addUncommittedStatementIndexes(rowMutationMap.values());
+            }
         }
         return uncommittedStatementIndexes;
     }
@@ -1417,7 +1545,7 @@ public class MutationState implements SQLCloseable {
     private void resetState() {
         numRows = 0;
         estimatedSize = 0;
-        this.mutations.clear();
+        this.mutationsMap.clear();
         phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
     }
 
@@ -1437,7 +1565,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public void commit() throws SQLException {
-        Map<TableRef, MultiRowMutationState> txMutations = 
Collections.emptyMap();
+        Map<TableRef, List<MultiRowMutationState>> txMutations = 
Collections.emptyMap();
         int retryCount = 0;
         do {
             boolean sendSuccessful = false;
@@ -1518,7 +1646,7 @@ public class MutationState implements SQLCloseable {
                 break;
             }
             retryCount++;
-            mutations.putAll(txMutations);
+            mutationsMap.putAll(txMutations);
         } while (true);
     }
 
@@ -1576,7 +1704,7 @@ public class MutationState implements SQLCloseable {
      * @throws SQLException
      */
     public boolean sendUncommitted() throws SQLException {
-        return sendUncommitted(mutations.keySet().iterator());
+        return sendUncommitted(mutationsMap.keySet().iterator());
     }
 
     /**
@@ -1604,7 +1732,7 @@ public class MutationState implements SQLCloseable {
         if (filteredTableRefs.hasNext()) {
             // FIXME: strip table alias to prevent equality check from failing 
due to alias mismatch on null alias.
             // We really should be keying the tables based on the physical 
table name.
-            List<TableRef> strippedAliases = 
Lists.newArrayListWithExpectedSize(mutations.keySet().size());
+            List<TableRef> strippedAliases = 
Lists.newArrayListWithExpectedSize(mutationsMap.keySet().size());
             while (filteredTableRefs.hasNext()) {
                 TableRef tableRef = filteredTableRefs.next();
                 // REVIEW: unclear if we need this given we start transactions 
when resolving a table
@@ -1683,6 +1811,10 @@ public class MutationState implements SQLCloseable {
             return rowKeyToRowMutationState.put(ptr, rowMutationState);
         }
 
+        public RowMutationState get(ImmutableBytesPtr ptr) {
+            return rowKeyToRowMutationState.get(ptr);
+        }
+
         public void putAll(MultiRowMutationState other) {
             estimatedSize += other.estimatedSize;
             rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
@@ -1747,7 +1879,16 @@ public class MutationState implements SQLCloseable {
             return statementIndexes;
         }
 
-        void join(RowMutationState newRow) {
+        /**
+         * Join the newRow with the current row if it doesn't conflict with it.
+         * A regular upsert conflicts with a conditional upsert
+         * @param newRow
+         * @return True if the rows were successfully joined else False
+         */
+        boolean join(RowMutationState newRow) {
+            if (isConflicting(newRow)) {
+                return false;
+            }
             // If we already have a row and the new row has an ON DUPLICATE 
KEY clause
             // ignore the new values (as that's what the server will do).
             if (newRow.onDupKeyBytes == null) {
@@ -1766,6 +1907,7 @@ public class MutationState implements SQLCloseable {
             // increments of the same row in the same commit batch.
             this.onDupKeyBytes = 
PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
             statementIndexes = joinSortedIntArrays(statementIndexes, 
newRow.getStatementIndexes());
+            return true;
         }
 
         @Nonnull
@@ -1773,6 +1915,10 @@ public class MutationState implements SQLCloseable {
             return rowTsColInfo;
         }
 
+        public boolean isConflicting(RowMutationState newRowMutationState) {
+            return (this.onDupKeyBytes != null && 
newRowMutationState.onDupKeyBytes == null ||
+                this.onDupKeyBytes == null && 
newRowMutationState.onDupKeyBytes != null);
+        }
     }
 
     public ReadMetricQueue getReadMetricQueue() {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
index c157873..0af2148 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
@@ -196,9 +196,17 @@ public class PhoenixKeyValueUtil {
         long size = 0;
         // iterate over table
         for (Entry<TableRef, MultiRowMutationState> tableEntry : 
tableMutationMap.entrySet()) {
-            // iterate over rows
-            for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
tableEntry.getValue().entrySet()) {
-                size += calculateRowMutationSize(rowEntry);
+            size += calculateMultiRowMutationSize(tableEntry.getValue());
+        }
+        return size;
+    }
+
+    public static long getEstimatedRowMutationSizeWithBatch(Map<TableRef, 
List<MultiRowMutationState>> tableMutationMap) {
+        long size = 0;
+        // iterate over table
+        for (Entry<TableRef, List<MultiRowMutationState>> tableEntry : 
tableMutationMap.entrySet()) {
+            for (MultiRowMutationState batch : tableEntry.getValue()) {
+                size += calculateMultiRowMutationSize(batch);
             }
         }
         return size;
@@ -214,6 +222,15 @@ public class PhoenixKeyValueUtil {
         return KeyValueUtil.copyToNewKeyValue(c);
     }
 
+    private static long calculateMultiRowMutationSize(MultiRowMutationState 
mutations) {
+        long size = 0;
+        // iterate over rows
+        for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
mutations.entrySet()) {
+            size += calculateRowMutationSize(rowEntry);
+        }
+        return size;
+    }
+
     private static long calculateRowMutationSize(Entry<ImmutableBytesPtr, 
RowMutationState> rowEntry) {
         int rowLength = rowEntry.getKey().getLength();
         long colValuesLength = rowEntry.getValue().calculateEstimatedSize();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
index f39312f..0a910e2 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -17,23 +17,6 @@
  */
 package org.apache.phoenix.execute;
 
-import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Delete;
@@ -42,17 +25,42 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PUnsignedInt;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
 
 public class MutationStateTest {
 
@@ -211,4 +219,66 @@ public class MutationStateTest {
         }
 
     }
+
+    @Test
+    public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+        String dataTable1 = generateUniqueName();
+        String dataTable2 = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(String.format(
+                "create table %s (id1 UNSIGNED_INT not null primary key, 
appId1 VARCHAR)", dataTable1));
+            conn.createStatement().execute(String.format(
+                "create table %s (id2 UNSIGNED_INT not null primary key, 
appId2 VARCHAR)", dataTable2));
+
+            conn.createStatement().execute(String.format(
+                "upsert into %s(id1,appId1) values(111,'app1')", dataTable1));
+            conn.createStatement().execute(String.format(
+                "upsert into %s(id1,appId1) values(111, 'app1') ON DUPLICATE 
KEY UPDATE appId1 = null", dataTable1));
+            conn.createStatement().execute(String.format(
+                "upsert into %s(id2,appId2) values(222,'app2')", dataTable2));
+            conn.createStatement().execute(String.format(
+                "upsert into %s(id2,appId2) values(222,'app2') ON DUPLICATE 
KEY UPDATE appId2 = null", dataTable2));
+
+            final PhoenixConnection pconn = 
conn.unwrap(PhoenixConnection.class);
+            MutationState state = pconn.getMutationState();
+            assertEquals(2, state.getNumRows());
+
+            int actualPairs = 0;
+            Iterator<Pair<byte[], List<Mutation>>> mutations = 
state.toMutations();
+            while (mutations.hasNext()) {
+                Pair<byte[], List<Mutation>> nextTable = mutations.next();
+                ++actualPairs;
+                assertEquals(1, nextTable.getSecond().size());
+            }
+            // we have 2 tables and each table has 2 mutation batches
+            // so we should get 4 <table name, [mutations]> pairs
+            assertEquals(4, actualPairs);
+
+            List<Map<TableRef, MultiRowMutationState>> commitBatches = 
state.createCommitBatches();
+            assertEquals(2, commitBatches.size());
+            // first commit batch should only contain regular upserts
+            verifyCommitBatch(commitBatches.get(0), false, 2, 1);
+            verifyCommitBatch(commitBatches.get(1), true, 2, 1);
+        }
+    }
+
+    private void verifyCommitBatch(Map<TableRef, MultiRowMutationState> 
commitBatch, boolean conditional,
+        int numberOfBatches, int rowsPerBatch) {
+        // one for each table
+        assertEquals(numberOfBatches, commitBatch.size());
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : 
commitBatch.entrySet()) {
+            TableRef tableRef = entry.getKey();
+            MultiRowMutationState batch = entry.getValue();
+            assertEquals(rowsPerBatch, batch.size());
+            for (Map.Entry<ImmutableBytesPtr, RowMutationState> row : 
batch.entrySet()) {
+                ImmutableBytesPtr key = row.getKey();
+                RowMutationState rowMutationState = row.getValue();
+                if (conditional == true) {
+                    assertNotNull(rowMutationState.getOnDupKeyBytes());
+                } else {
+                    assertNull(rowMutationState.getOnDupKeyBytes());
+                }
+            }
+        }
+    }
 }

Reply via email to