This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
new 44152b4 PHOENIX-5743 Concurrent read repairs on the same index row
should be idempotent
44152b4 is described below
commit 44152b43a66302360c4439fbabfe1a6739de7a6b
Author: Kadir <[email protected]>
AuthorDate: Thu Feb 20 13:46:56 2020 -0800
PHOENIX-5743 Concurrent read repairs on the same index row should be
idempotent
---
.../end2end/index/GlobalIndexCheckerIT.java | 61 ++++++++++++++++++++++
.../apache/phoenix/index/GlobalIndexChecker.java | 15 ++----
2 files changed, 64 insertions(+), 12 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 30f9b34..fea8741 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -307,6 +307,67 @@ public class GlobalIndexCheckerIT extends
BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testUnverifiedRowRepair() throws Exception {
+ if (async) {
+ // No need to run the same test twice one for async = true and the
other for async = false
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ conn.createStatement().execute("create table " + dataTableName +
+ " (id varchar(10) not null primary key, val1 varchar(10),
val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName +
" on " +
+ dataTableName + " (val1) include (val2, val3)");
+ // Configure IndexRegionObserver to fail the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + "
(id, val1, val2) values ('a', 'ab','abc')");
+ commitWithException(conn);
+ // The above upsert will create an unverified index row
+ // Configure IndexRegionObserver to allow the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ // Verify that this row is not visible
+ String selectSql = "SELECT * from " + dataTableName + " WHERE val1
= 'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ // Verify that we do not read from the unverified row
+ assertFalse(rs.next());
+ // Insert the same row with a value for val3
+ conn.createStatement().execute("upsert into " + dataTableName + "
values ('a', 'ab','abc', 'abcd')");
+ conn.commit();
+ // At this moment val3 in the data table row should not have null
value
+ selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 =
'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("abcd", rs.getString(1));
+ assertFalse(rs.next());
+ // Configure IndexRegionObserver to fail the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + "
(id, val1, val2) values ('a', 'ab','abc')");
+ commitWithException(conn);
+ // The above upsert will create an unverified index row
+ // Configure IndexRegionObserver to allow the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ // Verify that this row is still read back correctly
+ for (int i = 0; i < 2; i++) {
+ selectSql = "SELECT val3 from " + dataTableName + " WHERE val1
= 'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName,
indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ // Verify that the repair of the unverified row did not affect
the valid index row
+ assertTrue(rs.next());
+ assertEquals("abcd", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ // Add rows and check everything is still okay
+ verifyTableHealth(conn, dataTableName, indexTableName);
+ }
+ }
+
+ @Test
public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index ef4ac1f..e1c1106 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -248,18 +248,9 @@ public class GlobalIndexChecker extends BaseRegionObserver
{
if ((EnvironmentEdgeManager.currentTimeMillis() - ts) >
ageThreshold) {
Delete del = new Delete(indexRowKey, ts);
if (specific) {
- // Get all the cells of this row
- deleteRowScan.withStartRow(indexRowKey, true);
- deleteRowScan.withStopRow(indexRowKey, true);
- deleteRowScan.setTimeRange(0, ts + 1);
- deleteRowScanner = region.getScanner(deleteRowScan);
- row.clear();
- deleteRowScanner.next(row);
- deleteRowScanner.close();
- // We are deleting a specific version of a row so the
flowing loop is for that
- for (Cell cell : row) {
- del.addColumn(CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell), cell.getTimestamp());
- }
+
del.addFamilyVersion(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
ts);
+ } else {
+
del.addFamily(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
ts);
}
Mutation[] mutations = new Mutation[]{del};
region.batchMutate(mutations, HConstants.NO_NONCE,
HConstants.NO_NONCE);