gokceni commented on a change in pull request #1183:
URL: https://github.com/apache/phoenix/pull/1183#discussion_r601614616
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
##########
@@ -614,6 +614,48 @@ public void
testRowsCreatedViaUpsertOnDuplicateKeyShouldNotBeReturnedInQueryIfNo
conn.close();
}
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + "(pk varchar primary
key, counter1 bigint, counter2 smallint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ // row doesn't exist
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a',0,1)", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,1) ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 2, 1);
Review comment:
Should not counter1 value be 3? We are upserting 1 for counter1 and
counter1=1+2
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
##########
@@ -614,6 +614,48 @@ public void
testRowsCreatedViaUpsertOnDuplicateKeyShouldNotBeReturnedInQueryIfNo
conn.close();
}
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + "(pk varchar primary
key, counter1 bigint, counter2 smallint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ // row doesn't exist
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a',0,1)", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,1) ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 2, 1);
+
+ // row exists
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a', 7, 4)", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,1) ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 9, 4);
Review comment:
To see these switched could be nice like row 635 and 634 swapped so that
we are sure they are in the same batch and the order doesn't matter in the
batch.
##########
File path:
phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
##########
@@ -210,4 +216,63 @@ public void testPendingMutationsOnDDL() throws Exception {
+ "( id1 UNSIGNED_INT not null primary key," + "appId1
VARCHAR)");
}
}
+
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(
+ "create table MUTATION_TEST1" +
Review comment:
Use generated table names instead.
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
##########
@@ -614,6 +614,48 @@ public void
testRowsCreatedViaUpsertOnDuplicateKeyShouldNotBeReturnedInQueryIfNo
conn.close();
}
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + "(pk varchar primary
key, counter1 bigint, counter2 smallint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ // row doesn't exist
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a',0,1)", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,1) ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 2, 1);
+
+ // row exists
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a', 7, 4)", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,1) ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 9, 4);
+
+ // partial update
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s (pk, counter2) VALUES('a',100) ON DUPLICATE
KEY UPDATE counter1 = counter1 + 2", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s (pk, counter2) VALUES ('a',125)", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 11, 125);
Review comment:
Also how about adding a check for char type column updates?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
##########
@@ -404,67 +434,89 @@ public int getNumRows() {
return numRows;
}
+ private MultiRowMutationState getLastMutationBatch(Map<TableRef,
List<MultiRowMutationState>> mutations, TableRef tableRef) {
+ List<MultiRowMutationState> mutationBatches = mutations.get(tableRef);
+ if (mutationBatches == null || mutationBatches.isEmpty()) {
+ return null;
+ }
+ return mutationBatches.get(mutationBatches.size() - 1);
+ }
+
private void joinMutationState(TableRef tableRef, MultiRowMutationState
srcRows,
- Map<TableRef, MultiRowMutationState> dstMutations) {
+ Map<TableRef, List<MultiRowMutationState>> dstMutations) {
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
- boolean incrementRowCount = dstMutations == this.mutations;
- MultiRowMutationState existingRows = dstMutations.put(tableRef,
srcRows);
- if (existingRows != null) { // Rows for that table already exist
- // Loop through new rows and replace existing with new
- for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry :
srcRows.entrySet()) {
- // Replace existing row with new row
- RowMutationState existingRowMutationState =
existingRows.put(rowEntry.getKey(), rowEntry.getValue());
- if (existingRowMutationState != null) {
- Map<PColumn, byte[]> existingValues =
existingRowMutationState.getColumnValues();
- if (existingValues != PRow.DELETE_MARKER) {
- Map<PColumn, byte[]> newRow =
rowEntry.getValue().getColumnValues();
- // if new row is PRow.DELETE_MARKER, it means delete,
and we don't need to merge it with
- // existing row.
- if (newRow != PRow.DELETE_MARKER) {
- // decrement estimated size by the size of the old
row
- estimatedSize -=
existingRowMutationState.calculateEstimatedSize();
- // Merge existing column values with new column
values
- existingRowMutationState.join(rowEntry.getValue());
- // increment estimated size by the size of the new
row
- estimatedSize +=
existingRowMutationState.calculateEstimatedSize();
- // Now that the existing row has been merged with
the new row, replace it back
- // again (since it was merged with the new one
above).
- existingRows.put(rowEntry.getKey(),
existingRowMutationState);
- }
- }
- } else {
- if (incrementRowCount && !isIndex) { // Don't count index
rows in row count
- numRows++;
- // increment estimated size by the size of the new row
- estimatedSize +=
rowEntry.getValue().calculateEstimatedSize();
- }
- }
- }
- // Put the existing one back now that it's merged
- dstMutations.put(tableRef, existingRows);
- } else {
+ boolean incrementRowCount = dstMutations == this.mutationsMap;
+ // we only need to check if the new mutation batch (srcRows) conflicts
with the
+ // last mutation batch
Review comment:
why? could you add it to the comment
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
##########
@@ -614,6 +614,48 @@ public void
testRowsCreatedViaUpsertOnDuplicateKeyShouldNotBeReturnedInQueryIfNo
conn.close();
}
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + "(pk varchar primary
key, counter1 bigint, counter2 smallint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ // row doesn't exist
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a',0,1)", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,1) ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 2, 1);
+
+ // row exists
+ conn.createStatement().execute(String.format("UPSERT INTO %s
VALUES('a', 7, 4)", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s VALUES('a',1,1) ON DUPLICATE KEY UPDATE
counter1 = counter1 + 2", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 9, 4);
+
+ // partial update
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s (pk, counter2) VALUES('a',100) ON DUPLICATE
KEY UPDATE counter1 = counter1 + 2", tableName));
+ conn.createStatement().execute(String.format(
+ "UPSERT INTO %s (pk, counter2) VALUES ('a',125)", tableName));
+ conn.commit();
+ assertRow(conn, tableName, "a", 11, 125);
Review comment:
Recommend adding a check that index row is updated as well
##########
File path:
phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
##########
@@ -210,4 +216,63 @@ public void testPendingMutationsOnDDL() throws Exception {
+ "( id1 UNSIGNED_INT not null primary key," + "appId1
VARCHAR)");
}
}
+
+ @Test
+ public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(
+ "create table MUTATION_TEST1" +
+ "( id1 UNSIGNED_INT not null primary key," +
+ "appId1 VARCHAR)");
+ conn.createStatement().execute(
+ "create table MUTATION_TEST2" +
+ "( id2 UNSIGNED_INT not null primary key," +
+ "appId2 VARCHAR)");
+
+ conn.createStatement().execute("upsert into
MUTATION_TEST1(id1,appId1) values(111,'app1')");
+ conn.createStatement().execute(
+ "upsert into MUTATION_TEST1(id1,appId1) values(111, 'app1') ON
DUPLICATE KEY UPDATE appId1 = null");
+ conn.createStatement().execute("upsert into
MUTATION_TEST2(id2,appId2) values(222,'app2')");
+ conn.createStatement().execute(
+ "upsert into MUTATION_TEST2(id2,appId2) values(222,'app2') ON
DUPLICATE KEY UPDATE appId2 = null");
+
+ final PhoenixConnection pconn =
conn.unwrap(PhoenixConnection.class);
+ MutationState state = pconn.getMutationState();
+ assertEquals(2, state.getNumRows());
+
+ int actualPairs = 0;
+ Iterator<Pair<byte[], List<Mutation>>> mutations =
state.toMutations();
+ while (mutations.hasNext()) {
+ Pair<byte[], List<Mutation>> nextTable = mutations.next();
+ ++actualPairs;
+ assertEquals(1, nextTable.getSecond().size());
+ }
+ assertEquals(4, actualPairs);
+
+ List<Map<TableRef, MultiRowMutationState>> commitBatches =
state.createCommitBatches();
+ assertEquals(2, commitBatches.size());
+ // first commit batch should only contain regular upserts
+ verifyCommitBatch(commitBatches.get(0), false);
+ verifyCommitBatch(commitBatches.get(1), true);
+ }
+ }
+
+ private void verifyCommitBatch(Map<TableRef, MultiRowMutationState>
commitBatch, boolean conditional) {
+ // one for each table
+ assertEquals(2, commitBatch.size());
Review comment:
recommend adding expected size as parameter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]