This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 50279643c604c64eebe0b2041f764b1f00f6ea27 Author: Kadir <[email protected]> AuthorDate: Sat Oct 5 16:30:07 2019 -0700 PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum) --- .../end2end/index/GlobalMutableNonTxIndexIT.java | 1 - .../org/apache/phoenix/util/IndexScrutinyIT.java | 12 +--- .../phoenix/hbase/index/IndexRegionObserver.java | 17 +++++- .../apache/phoenix/index/GlobalIndexChecker.java | 70 ++++++++++++++++++++-- 4 files changed, 82 insertions(+), 18 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java index 35d5049..7097234 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java @@ -27,7 +27,6 @@ public class GlobalMutableNonTxIndexIT extends BaseIndexIT { public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, String transactionProvider, boolean columnEncoded, boolean skipPostIndexUpdates) { super(localIndex, mutable, transactionProvider, columnEncoded); - IndexRegionObserver.setSkipPostIndexUpdatesForTesting(skipPostIndexUpdates); } @Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactionProvider={2},columnEncoded={3},skipPostIndexUpdates={4}") // name is used by failsafe as file name in reports diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java index 20ec965..8af61c4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java @@ -91,17 +91,11 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT { conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')"); conn.commit(); - // Writing index directly will generate unverified rows with no corresponding data rows. These rows will not be visible to the applications + // Writing index directly will generate unverified rows. These rows will recovered if there exists the + // corresponding data row conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')"); conn.commit(); - try { - IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); - fail(); - } catch (AssertionError e) { - assertEquals("Expected data table row count to match expected:<2> but was:<1>", e.getMessage()); - } + assertEquals(2, IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName)); } } - - } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 7c37c7a..1820194 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -159,6 +159,7 @@ public class IndexRegionObserver extends BaseRegionObserver { // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the // row (i.e., concurrent updates). private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>(); + private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>(); private BatchMutateContext(int clientVersion) { this.clientVersion = clientVersion; @@ -374,18 +375,27 @@ public class IndexRegionObserver extends BaseRegionObserver { } } - private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException { + private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) { for (int i = 0; i < miniBatchOp.size(); i++) { if (miniBatchOp.getOperationStatus(i) == IGNORE) { continue; } Mutation m = miniBatchOp.getOperation(i); if (this.builder.isEnabled(m)) { - context.rowLocks.add(lockManager.lockRow(m.getRow(), rowLockWaitDuration)); + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + if (!context.rowsToLock.contains(row)) { + context.rowsToLock.add(row); + } } } } + private void lockRows(BatchMutateContext context) throws IOException { + for (ImmutableBytesPtr rowKey : context.rowsToLock) { + context.rowLocks.add(lockManager.lockRow(rowKey, rowLockWaitDuration)); + } + } + private void populatePendingRows(BatchMutateContext context, long now) { for (RowLock rowLock : context.rowLocks) { ImmutableBytesPtr rowKey = rowLock.getRowKey(); @@ -598,7 +608,8 @@ public class IndexRegionObserver extends BaseRegionObserver { * while determining the index updates */ if (replayWrite == null) { - lockRows(miniBatchOp, context); + populateRowsToLock(miniBatchOp, context); + lockRows(context); } long now = EnvironmentEdgeManager.currentTimeMillis(); // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index ba3bd6d..513cd23 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; @@ -84,6 +86,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { private Scan scan; private Scan indexScan; private Scan singleRowIndexScan; + private Scan singleRowDataScan; private Scan buildIndexScan = null; private Table dataHTable = null; private byte[] emptyCF; @@ -207,22 +210,71 @@ public class GlobalIndexChecker extends BaseRegionObserver { } } - private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts) throws IOException { + private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts, boolean specific) throws IOException { if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) { Delete del = new Delete(indexRowKey, ts); - // We are deleting a specific version of a row so the flowing loop is for that - for (Cell cell : row) { - del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp()); + if (specific) { + // We are deleting a specific version of a row so the flowing loop is for that + for (Cell cell : row) { + del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp()); + } } Mutation[] mutations = new Mutation[]{del}; region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); } } + private boolean doesDataRowExist(byte[] indexRowKey, byte[] dataRowKey) throws IOException { + singleRowDataScan.withStartRow(dataRowKey, true); + singleRowDataScan.withStopRow(dataRowKey, true); + singleRowDataScan.setTimeRange(0, maxTimestamp); + try (ResultScanner resultScanner = dataHTable.getScanner(singleRowDataScan)) { + final Result result = resultScanner.next(); + if (result == null) { + // There is no data table row for this index unverified index row. We need to skip it. + return false; + } + else { + ValueGetter getter = new ValueGetter() { + final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); + + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { + Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); + if (cell == null) { + return null; + } + valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + return valuePtr; + } + + @Override + public byte[] getRowKey() { + return result.getRow(); + } + }; + for (Cell cell : result.rawCells()) { + String cellString = cell.toString(); + LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell))); + } + byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp); + if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length, + indexRowKey, 0, indexRowKey.length) != 0) { + // The row key of the index row that has been built is different than the row key of the unverified + // index row + return false; + } + } + } catch (Throwable t) { + ServerUtil.throwIOException(dataHTable.getName().toString(), t); + } + return true; + } private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException { // Build the data table row key from the index table row key if (buildIndexScan == null) { buildIndexScan = new Scan(); + singleRowDataScan = new Scan(); indexScan = new Scan(scan); singleRowIndexScan = new Scan(scan); byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME); @@ -262,6 +314,14 @@ public class GlobalIndexChecker extends BaseRegionObserver { } catch (Throwable t) { ServerUtil.throwIOException(dataHTable.getName().toString(), t); } + if (!doesDataRowExist(indexRowKey, dataRowKey)) { + // Delete the unverified row from index if it is old enough + deleteRowIfAgedEnough(indexRowKey, row, ts, false); + // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is + // sufficient to do that + row.clear(); + return; + } // Close the current scanner as the newly build row will not be visible to it scanner.close(); // Open a new scanner starting from the current row @@ -286,7 +346,7 @@ public class GlobalIndexChecker extends BaseRegionObserver { // the data table row is masked by this unverified row, or that the corresponding data table row does // exist // First delete the unverified row from index if it is old enough - deleteRowIfAgedEnough(indexRowKey, row, ts); + deleteRowIfAgedEnough(indexRowKey, row, ts, true); // Now we will do a single row scan to retrieve the verified index row build from the data table row // if such an index row exists. Note we cannot read all versions in one scan as the max number of row // versions for an index table can be 1. In that case, we will get only one (i.e., the most recent
