jinggou commented on code in PR #1884:
URL: https://github.com/apache/phoenix/pull/1884#discussion_r1621525579
##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java:
##########
@@ -720,6 +747,470 @@ public void testMultiplePartialUpdatesInSameBatch()
throws Exception {
}
}
+ @Test
+ public void testColumnsTimestampUpdateWithAllCombinations() throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName + "(pk varchar primary
key, " +
+ "counter1 integer, counter2 integer, counter3 smallint,
counter4 bigint, " +
+ "counter5 varchar)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = String.format("UPSERT INTO %s VALUES('abc', 0, 10,
100, 1000, 'NONE')",
+ tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+
+ List<Long> oldTimestamps = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ dml = "UPSERT INTO " + tableName + " VALUES ('abc', 0, 10) ON
DUPLICATE KEY UPDATE " +
+ // conditional update with different value
+ "counter1 = CASE WHEN counter1 < 1 THEN counter1 + 1 ELSE
counter1 END, " +
+ // conditional update with same value in ELSE clause (will
not update timestamp)
+ "counter2 = CASE WHEN counter2 < 10 THEN counter2 + 1 ELSE
counter2 END, " +
+ // intentional update with different value
+ "counter3 = counter3 + 100, " +
+ // intentional update with same value
+ "counter4 = counter4";
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(10, rs.getInt("counter2"));
+ assertEquals(200, rs.getInt("counter3"));
+ assertEquals(1000, rs.getInt("counter4"));
+ assertEquals("NONE", rs.getString("counter5"));
+ assertFalse(rs.next());
+
+ List<Long> newTimestamps = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ assertEquals(6, oldTimestamps.size());
+ assertEquals(6, newTimestamps.size());
+ assertEquals(oldTimestamps.get(2), newTimestamps.get(2)); //
counter2 NOT updated
+ assertEquals(oldTimestamps.get(5), newTimestamps.get(5)); //
counter5 NOT updated
+ assertTrue(oldTimestamps.get(0) < newTimestamps.get(0)
+ && oldTimestamps.get(1) < newTimestamps.get(1)
+ && oldTimestamps.get(3) < newTimestamps.get(3)
+ && oldTimestamps.get(4) < newTimestamps.get(4)); // other
columns updated
+ }
+ }
+
+ @Test
+ @Ignore("Until Phoenix upgrades HBase dependency to 2.6.0 to include
HBASE-28424")
+ public void testColumnsTimestampUpdateWithOneConditionalUpdate() throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 bigint, counter2
bigint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml;
+ dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100)",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: timestamps update with different value in
WHEN-THEN-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter1 < 1 THEN counter1 +
1 ELSE counter1 END",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList1.get(0) > timestampList0.get(0)
+ && timestampList1.get(1) > timestampList0.get(1));
+
+ // Case 2: timestamps NOT update with same value in ELSE-clause
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(0, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList1.get(0), timestampList2.get(0)); //
empty column NOT updated
+ assertEquals(timestampList1.get(1), timestampList2.get(1)); //
counter1 NOT updated
+
+ // Case 3: timestamps update with different value in ELSE-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter1 < 1 THEN counter1
ELSE counter1 + 1 END",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(2, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList3.get(0) > timestampList2.get(0)
+ && timestampList3.get(1) > timestampList2.get(1));
+
+ // Case 4: timestamps update with same value in WHEN-THEN-clause
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList4 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList4.get(0) > timestampList3.get(0)
+ && timestampList4.get(1) > timestampList3.get(1));
+ }
+ }
+
+ @Test
+ @Ignore("Until Phoenix upgrades HBase dependency to 2.6.0 to include
HBASE-28424")
+ public void testColumnsTimestampUpdateWithOneConditionalValuesUpdate()
throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 integer, counter2
integer)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml = String.format("UPSERT INTO %s VALUES('abc', 1, 100)",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: timestamps update with same value in WHEN-THEN-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter2 <= 100 THEN 1 ELSE 0 END",
tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ assertTrue(timestampList0.get(0) < timestampList1.get(0)
+ && timestampList0.get(1) < timestampList1.get(1)); //
counter1 updated
+ assertEquals(timestampList0.get(2), timestampList1.get(2)); //
counter2 NOT updated
+
+ // Case 2: timestamps NOT update with same value in ELSE-clause
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter2 > 100 THEN 0 ELSE 1 END",
tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(0, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ assertEquals(timestampList1.get(0), timestampList2.get(0));
+ assertEquals(timestampList1.get(1), timestampList2.get(1));
+ assertEquals(timestampList1.get(2), timestampList2.get(2));
+ }
+ }
+
+ @Test
+ @Ignore("Until Phoenix upgrades HBase dependency to 2.6.0 to include
HBASE-28424")
+ public void testColumnsTimestampUpdateWithMultipleConditionalUpdate()
throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 integer, counter2
integer, approval " +
+ "varchar)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml;
+ dml = String.format("UPSERT INTO %s VALUES('abc', 0, 9, 'NONE')",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: all columns timestamps updated
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = CASE WHEN counter1 < 1 THEN 1 ELSE counter1
END," +
+ "counter2 = CASE WHEN counter2 < 11 THEN counter2 + 1 ELSE
counter2 END," +
+ "approval = CASE WHEN counter2 < 10 THEN 'NONE' " +
+ "WHEN counter2 < 11 THEN 'MANAGER_APPROVAL' " +
+ "ELSE approval END", tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(10, rs.getInt("counter2"));
+ assertEquals("NONE", rs.getString("approval"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList1.get(0) > timestampList0.get(0)
+ && timestampList1.get(1) > timestampList0.get(1)
+ && timestampList1.get(2) > timestampList0.get(2)
+ && timestampList1.get(3) > timestampList0.get(3));
+
+ // Case 2: timestamps of counter2 and approval updated
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(11, rs.getInt("counter2"));
+ assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList1.get(1), timestampList2.get(1)); //
counter1 NOT updated
+ assertTrue(timestampList2.get(0) > timestampList1.get(0)
+ && timestampList2.get(2) > timestampList1.get(2)
+ && timestampList2.get(3) > timestampList1.get(3));
+
+ // Case 3: all timestamps NOT updated, including empty column
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(0, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(11, rs.getInt("counter2"));
+ assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList2.get(0), timestampList3.get(0));
+ assertEquals(timestampList2.get(1), timestampList3.get(1));
+ assertEquals(timestampList2.get(2), timestampList3.get(2));
+ assertEquals(timestampList2.get(3), timestampList3.get(3));
+ }
+ }
+
+ @Test
+ public void testColumnsTimestampUpdateWithIntentionalUpdate() throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 bigint, counter2
bigint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+
+ String dml;
+ dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100)",
tableName);
+ int actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ List<Long> timestampList0 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+
+ // Case 1: different value of one column
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1 + 1",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList1 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList0.get(2), timestampList1.get(2)); //
counter2 NOT updated
+ assertTrue(timestampList1.get(0) > timestampList0.get(0)
+ && timestampList1.get(1) > timestampList0.get(1)); //
updated columns
+
+ // Case 2: same value of one column will also be updated
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList2 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertEquals(timestampList2.get(2), timestampList1.get(2)); //
counter2 NOT updated
+ assertTrue(timestampList2.get(0) > timestampList1.get(0)
+ && timestampList2.get(1) > timestampList1.get(1));
+
+ // Case 3: same value of one column, different of the other
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1,
counter2 = counter2 + 1",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(101, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList3 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList3.get(0) > timestampList2.get(0)
+ && timestampList3.get(1) > timestampList2.get(1)
+ && timestampList3.get(2) > timestampList2.get(2)); //
counter2
+
+ // Case 4: same values of all columns will also be updated
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES
('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1,
counter2 = counter2",
+ tableName);
+ actualReturnValue = conn.createStatement().executeUpdate(dml);
+ assertEquals(1, actualReturnValue);
+
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(1, rs.getInt("counter1"));
+ assertEquals(101, rs.getInt("counter2"));
+ assertFalse(rs.next());
+
+ List<Long> timestampList4 = getAllColumnsLatestCellTimestamp(conn,
tableName);
+ assertTrue(timestampList4.get(0) > timestampList3.get(0)
+ && timestampList4.get(1) > timestampList3.get(1)
+ && timestampList4.get(2) > timestampList3.get(2));
+ }
+ }
+
+ @Test
+ @Ignore("Until Phoenix upgrades HBase dependency to 2.6.0 to include
HBASE-28424")
+ public void testBatchedUpsertOnDupKeyAutoCommit() throws Exception {
+ testBatchedUpsertOnDupKey(true);
+ }
+
+ @Test
+ @Ignore("Until Phoenix upgrades HBase dependency to 2.6.0 to include
HBASE-28424")
+ public void testBatchedUpsertOnDupKeyNoAutoCommit() throws Exception {
+ testBatchedUpsertOnDupKey(false);
+ }
+
+ private void testBatchedUpsertOnDupKey(boolean autocommit) throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(autocommit);
+
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("create table " + tableName + "(pk varchar primary
key, " +
+ "counter1 integer, counter2 integer, approval varchar)");
+ createIndex(conn, tableName);
+
+ stmt.execute("UPSERT INTO " + tableName + " VALUES('a', 0, 10,
'NONE')");
+ conn.commit();
+
+ stmt.addBatch("UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('a', 0, 10) ON
DUPLICATE KEY IGNORE");
+ stmt.addBatch("UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('a', 0, 10) ON
DUPLICATE KEY UPDATE" +
+ " counter1 = CASE WHEN counter1 < 1 THEN 1 ELSE counter1
END");
+
+ stmt.addBatch("UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('b', 0, 9) ON DUPLICATE
KEY IGNORE");
+ String dml = "UPSERT INTO " + tableName +
+ " (pk, counter1, counter2) VALUES ('b', 0, 10) ON
DUPLICATE KEY UPDATE" +
+ " counter2 = CASE WHEN counter2 < 11 THEN counter2 + 1
ELSE counter2 END," +
+ " approval = CASE WHEN counter2 < 10 THEN 'NONE'" +
+ " WHEN counter2 < 11 THEN 'MANAGER_APPROVAL'" +
+ " ELSE approval END";
+ stmt.addBatch(dml);
+ stmt.addBatch(dml);
+ stmt.addBatch(dml);
+
+ int[] actualReturnValues = stmt.executeBatch();
+ int[] expectedReturnValues = new int[]{0, 1, 1, 1, 1, 0};
Review Comment:
@tkhurana Now I see that `PhoenixStatement#executeBatch` internally turns
off auto-commit, and the code of `PhoenixStatement#executeBatch` is different
in our lightfork? I'm testing using lightfork, that's why this test will pass.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]