This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 50279643c604c64eebe0b2041f764b1f00f6ea27
Author: Kadir <[email protected]>
AuthorDate: Sat Oct 5 16:30:07 2019 -0700

    PHOENIX-5505 Index read repair does not repair unverified rows with higher 
timestamp (addendum)
---
 .../end2end/index/GlobalMutableNonTxIndexIT.java   |  1 -
 .../org/apache/phoenix/util/IndexScrutinyIT.java   | 12 +---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 17 +++++-
 .../apache/phoenix/index/GlobalIndexChecker.java   | 70 ++++++++++++++++++++--
 4 files changed, 82 insertions(+), 18 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
index 35d5049..7097234 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
@@ -27,7 +27,6 @@ public class GlobalMutableNonTxIndexIT extends BaseIndexIT {
 
     public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, 
String transactionProvider, boolean columnEncoded, boolean 
skipPostIndexUpdates) {
         super(localIndex, mutable, transactionProvider, columnEncoded);
-        
IndexRegionObserver.setSkipPostIndexUpdatesForTesting(skipPostIndexUpdates);
     }
 
     
@Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactionProvider={2},columnEncoded={3},skipPostIndexUpdates={4}")
 // name is used by failsafe as file name in reports
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index 20ec965..8af61c4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -91,17 +91,11 @@ public class IndexScrutinyIT extends 
ParallelStatsDisabledIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
             conn.commit();
 
-            // Writing index directly will generate unverified rows with no 
corresponding data rows. These rows will not be visible to the applications
+            // Writing index directly will generate unverified rows. These 
rows will recovered if there exists the
+            // corresponding data row
             conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('ccc','a','2')");
             conn.commit();
-            try {
-                IndexScrutiny.scrutinizeIndex(conn, fullTableName, 
fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals("Expected data table row count to match 
expected:<2> but was:<1>", e.getMessage());
-            }
+            assertEquals(2, IndexScrutiny.scrutinizeIndex(conn, fullTableName, 
fullIndexName));
         }
     }
-
-
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 7c37c7a..1820194 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -159,6 +159,7 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
       // batch with a timestamp earlier than the timestamp of this batch and 
the earlier batch has a mutation on the
       // row (i.e., concurrent updates).
       private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+      private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
 
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
@@ -374,18 +375,27 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
       }
   }
 
-  private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, 
BatchMutateContext context) throws IOException {
+  private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> 
miniBatchOp, BatchMutateContext context) {
       for (int i = 0; i < miniBatchOp.size(); i++) {
           if (miniBatchOp.getOperationStatus(i) == IGNORE) {
               continue;
           }
           Mutation m = miniBatchOp.getOperation(i);
           if (this.builder.isEnabled(m)) {
-              context.rowLocks.add(lockManager.lockRow(m.getRow(), 
rowLockWaitDuration));
+              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+              if (!context.rowsToLock.contains(row)) {
+                  context.rowsToLock.add(row);
+              }
           }
       }
   }
 
+  private void lockRows(BatchMutateContext context) throws IOException {
+      for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+          context.rowLocks.add(lockManager.lockRow(rowKey, 
rowLockWaitDuration));
+      }
+  }
+
   private void populatePendingRows(BatchMutateContext context, long now) {
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
@@ -598,7 +608,8 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
        * while determining the index updates
        */
       if (replayWrite == null) {
-          lockRows(miniBatchOp, context);
+          populateRowsToLock(miniBatchOp, context);
+          lockRows(context);
       }
       long now = EnvironmentEdgeManager.currentTimeMillis();
       // Add the table rows in the mini batch to the collection of pending 
rows. This will be used to detect
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index ba3bd6d..513cd23 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
@@ -84,6 +86,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
         private Scan scan;
         private Scan indexScan;
         private Scan singleRowIndexScan;
+        private Scan singleRowDataScan;
         private Scan buildIndexScan = null;
         private Table dataHTable = null;
         private byte[] emptyCF;
@@ -207,22 +210,71 @@ public class GlobalIndexChecker extends 
BaseRegionObserver {
             }
         }
 
-        private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, 
long ts) throws IOException {
+        private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, 
long ts, boolean specific) throws IOException {
             if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > 
ageThreshold) {
                 Delete del = new Delete(indexRowKey, ts);
-                // We are deleting a specific version of a row so the flowing 
loop is for that
-                for (Cell cell : row) {
-                    del.addColumn(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell), cell.getTimestamp());
+                if (specific) {
+                    // We are deleting a specific version of a row so the 
flowing loop is for that
+                    for (Cell cell : row) {
+                        del.addColumn(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell), cell.getTimestamp());
+                    }
                 }
                 Mutation[] mutations = new Mutation[]{del};
                 region.batchMutate(mutations, HConstants.NO_NONCE, 
HConstants.NO_NONCE);
             }
         }
 
+        private boolean doesDataRowExist(byte[] indexRowKey, byte[] 
dataRowKey) throws IOException {
+            singleRowDataScan.withStartRow(dataRowKey, true);
+            singleRowDataScan.withStopRow(dataRowKey, true);
+            singleRowDataScan.setTimeRange(0, maxTimestamp);
+            try (ResultScanner resultScanner = 
dataHTable.getScanner(singleRowDataScan)) {
+                final Result result = resultScanner.next();
+                if (result == null) {
+                    // There is no data table row for this index unverified 
index row. We need to skip it.
+                    return false;
+                }
+                else {
+                    ValueGetter getter = new ValueGetter() {
+                        final ImmutableBytesWritable valuePtr = new 
ImmutableBytesWritable();
+
+                        @Override
+                        public ImmutableBytesWritable 
getLatestValue(ColumnReference ref, long ts) throws IOException {
+                            Cell cell = 
result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+                            if (cell == null) {
+                                return null;
+                            }
+                            valuePtr.set(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
+                            return valuePtr;
+                        }
+
+                        @Override
+                        public byte[] getRowKey() {
+                            return result.getRow();
+                        }
+                    };
+                    for (Cell cell : result.rawCells()) {
+                        String cellString = cell.toString();
+                        LOG.debug("Rebuilt row :" + cellString + " value : " + 
Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+                    }
+                    byte[] builtIndexRowKey = 
indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), 
null, null, maxTimestamp);
+                    if (Bytes.compareTo(builtIndexRowKey, 0, 
builtIndexRowKey.length,
+                            indexRowKey, 0, indexRowKey.length) != 0) {
+                        // The row key of the index row that has been built is 
different than the row key of the unverified
+                        // index row
+                        return false;
+                    }
+                }
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(dataHTable.getName().toString(), 
t);
+            }
+            return true;
+        }
         private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> 
row) throws IOException {
             // Build the data table row key from the index table row key
             if (buildIndexScan == null) {
                 buildIndexScan = new Scan();
+                singleRowDataScan = new Scan();
                 indexScan = new Scan(scan);
                 singleRowIndexScan = new Scan(scan);
                 byte[] dataTableName = 
scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
@@ -262,6 +314,14 @@ public class GlobalIndexChecker extends BaseRegionObserver 
{
             } catch (Throwable t) {
                 ServerUtil.throwIOException(dataHTable.getName().toString(), 
t);
             }
+            if (!doesDataRowExist(indexRowKey, dataRowKey)) {
+                // Delete the unverified row from index if it is old enough
+                deleteRowIfAgedEnough(indexRowKey, row, ts, false);
+                // Skip this unverified row (i.e., do not return it to the 
client). Just retuning empty row is
+                // sufficient to do that
+                row.clear();
+                return;
+            }
             // Close the current scanner as the newly build row will not be 
visible to it
             scanner.close();
             // Open a new scanner starting from the current row
@@ -286,7 +346,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 // the data table row is masked by this unverified row, or 
that the corresponding data table row does
                 // exist
                 // First delete the unverified row from index if it is old 
enough
-                deleteRowIfAgedEnough(indexRowKey, row, ts);
+                deleteRowIfAgedEnough(indexRowKey, row, ts, true);
                 // Now we will do a single row scan to retrieve the verified 
index row build from the data table row
                 // if such an index row exists. Note we cannot read all 
versions in one scan as the max number of row
                 // versions for an index table can be 1. In that case, we will 
get only one (i.e., the most recent

Reply via email to