This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new a89d153  PHOENIX-5957 Index rebuild should remove old index rows with 
higher timestamp
a89d153 is described below

commit a89d153ea7db3c5f2f41edbd453d2645cd504d7a
Author: Kadir <kozde...@salesforce.com>
AuthorDate: Mon Jun 15 16:14:18 2020 -0700

    PHOENIX-5957 Index rebuild should remove old index rows with higher 
timestamp
---
 .../end2end/IndexToolForNonTxGlobalIndexIT.java    |  78 +++++++++++++++-
 .../coprocessor/GlobalIndexRegionScanner.java      |   5 -
 .../coprocessor/IndexRebuildRegionScanner.java     | 102 +++++++++++++++------
 .../phoenix/coprocessor/IndexerRegionScanner.java  |   9 ++
 .../phoenix/index/VerifySingleIndexRowTest.java    |  31 ++++---
 5 files changed, 176 insertions(+), 49 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index 1ddab5a..8de9cdb 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -51,9 +51,11 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -61,6 +63,7 @@ import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -141,7 +144,7 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseUniqueNamesOwnClusterIT
 
     @BeforeClass
     public static synchronized void setup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(8);
         serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(20));
         
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, 
Long.toString(5));
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
@@ -152,7 +155,8 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseUniqueNamesOwnClusterIT
             Long.toString(Long.MAX_VALUE));
         serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
             Long.toString(Long.MAX_VALUE));
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        
serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
 Long.toString(0));
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(4);
         clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(true));
         clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, 
Long.toString(5));
         clientProps.put(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
@@ -425,6 +429,76 @@ public class IndexToolForNonTxGlobalIndexIT extends 
BaseUniqueNamesOwnClusterIT
         }
     }
 
+    @Ignore("HBase 1.7 is required for this test")
+    @Test
+    public void testCleanUpOldDesignIndexRows() throws Exception {
+        if (!mutable) {
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String stmString1 =
+                    "CREATE TABLE " + dataTableFullName
+                            + " (ID INTEGER NOT NULL PRIMARY KEY, NAME 
VARCHAR, ZIP INTEGER) "
+                            + tableDDLOptions;
+            conn.createStatement().execute(stmString1);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, 
?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+            // Insert N_ROWS rows
+            final int N_ROWS = 100;
+            final int N_OLD_ROWS = 10;
+            for (int i = 0; i < N_ROWS; i++) {
+                IndexToolIT.upsertRow(stmt1, i);
+            }
+            conn.commit();
+            String stmtString2 =
+                    String.format(
+                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC 
", indexTableName, dataTableFullName);
+            conn.createStatement().execute(stmtString2);
+
+            // Run the index MR job and verify that the index table is built 
correctly
+            IndexTool indexTool = IndexToolIT.runIndexTool(directApi, 
useSnapshot, schemaName, dataTableName, indexTableName, null, 0, 
IndexTool.IndexVerifyType.BEFORE, new String[0]);
+            assertEquals(N_ROWS, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(N_ROWS, 
indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(N_ROWS, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(N_ROWS, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(N_ROWS, actualRowCount);
+            // Read N_OLD_ROWS index rows and rewrite them back directly. This 
will overwrite existing rows with newer
+            // timestamps and set the empty column to value "x". The will make 
them old design rows (for mutable indexes)
+            String stmtString3 =
+                    String.format(
+                            "UPSERT INTO %s SELECT * FROM %s LIMIT %d", 
indexTableFullName, indexTableFullName, N_OLD_ROWS);
+            conn.createStatement().execute(stmtString3);
+            conn.commit();
+            // Verify that IndexTool reports that there are old design index 
rows
+            indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, 
schemaName, dataTableName, indexTableName, null, 0, 
IndexTool.IndexVerifyType.ONLY, new String[0]);
+            assertEquals(N_OLD_ROWS, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            // Clean up all old design rows
+            indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, 
schemaName, dataTableName, indexTableName, null, 0, 
IndexTool.IndexVerifyType.BEFORE, new String[0]);
+            assertEquals(N_OLD_ROWS, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            // Verify that IndexTool does not report them anymore
+            indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, 
schemaName, dataTableName, indexTableName, null, 0, 
IndexTool.IndexVerifyType.BEFORE, new String[0]);
+            assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(N_ROWS, actualRowCount);
+        }
+    }
+
     @Test
     public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index d74aa66..726708a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -134,11 +134,6 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
         indexHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
         indexTableTTL = 
indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
         maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
-        pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
-                new ThreadPoolBuilder("IndexVerify",
-                        
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
-                        
DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
-                        INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
         rowCountPerTask = 
config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
                 DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 6f82db3..50b1ffa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor;
 import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
 import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -59,6 +60,9 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.util.ByteUtil;
@@ -116,28 +120,35 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         if (indexRowKeyforReadRepair != null) {
             setReturnCodeForSingleRowRebuild();
             pageSizeInRows = 1;
-        } else {
-            try(org.apache.hadoop.hbase.client.Connection connection =
-                        
HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration()))
 {
-                indexRegionEndKeys = 
connection.getRegionLocator(indexHTable.getName()).getEndKeys();
-            }
+            return;
+        }
+        try(org.apache.hadoop.hbase.client.Connection connection =
+                    
HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration()))
 {
+            indexRegionEndKeys = 
connection.getRegionLocator(indexHTable.getName()).getEndKeys();
         }
+        pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+                new ThreadPoolBuilder("IndexVerify",
+                        
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+                        
DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+                        INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+
         if (verify) {
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             byte[] disableLoggingValueBytes =
-                
scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE);
+                    
scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE);
             if (disableLoggingValueBytes != null) {
                 disableLoggingVerifyType =
-                    
IndexTool.IndexDisableLoggingType.fromValue(disableLoggingValueBytes);
+                        
IndexTool.IndexDisableLoggingType.fromValue(disableLoggingValueBytes);
             }
             verificationOutputRepository =
-                new 
IndexVerificationOutputRepository(indexMaintainer.getIndexTableName()
-                    , hTableFactory, disableLoggingVerifyType);
+                    new 
IndexVerificationOutputRepository(indexMaintainer.getIndexTableName()
+                            , hTableFactory, disableLoggingVerifyType);
             verificationResult = new IndexToolVerificationResult(scan);
             verificationResultRepository =
                 new 
IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), 
hTableFactory);
             nextStartKey = null;
             minTimestamp = scan.getTimeRange().getMin();
+                    new 
IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), 
hTableFactory);
         }
     }
 
@@ -209,6 +220,11 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
     @Override
     public void close() throws IOException {
         innerScanner.close();
+        if (indexRowKeyforReadRepair != null) {
+            hTableFactory.shutdown();
+            indexHTable.close();
+            return;
+        }
         if (verify) {
             try {
                 if (verificationResultRepository != null) {
@@ -227,6 +243,11 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
                 }
             }
         }
+        else {
+            this.pool.stop("IndexRebuildRegionScanner is closing");
+            hTableFactory.shutdown();
+            indexHTable.close();
+        }
     }
 
     @VisibleForTesting
@@ -440,7 +461,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         return getMutationsWithSameTS(put, del);
     }
 
-    private void updateUnverifiedIndexRowCounters(Put actual,
+    private void updateUnverifiedIndexRowCounters(Put actual, long expectedTs, 
List<Mutation> oldIndexRowsToBeDeletedList,
                                                   
IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
         // Get the empty column of the given index row
         List<Cell> cellList = 
actual.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
@@ -463,6 +484,13 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         }
         // The empty column value is neither "verified" or "unverified". This 
must be a row from the old design
         
verificationPhaseResult.setOldIndexRowCount(verificationPhaseResult.getOldIndexRowCount()
 + 1);
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
+            long actualTs = getTimestamp(actual);
+            if (actualTs > expectedTs) {
+                
oldIndexRowsToBeDeletedList.add(indexMaintainer.buildRowDeleteMutation(actual.getRow(),
+                        IndexMaintainer.DeleteType.SINGLE_VERSION, actualTs));
+            }
+        }
     }
 
     /**
@@ -599,7 +627,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
 
     @VisibleForTesting
     public boolean verifySingleIndexRow(Result indexRow, Map<byte[], 
List<Mutation>> perTaskIndexKeyToMutationMap,
-                                        Set<byte[]> mostRecentIndexRowKeys,
+                                        Set<byte[]> mostRecentIndexRowKeys, 
List<Mutation> oldIndexRowsToBeDeletedList,
                                         
IndexToolVerificationResult.PhaseResult verificationPhaseResult,
                                         boolean isBeforeRebuild)
             throws IOException {
@@ -613,13 +641,15 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         }
         Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
         Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
-        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+        if (isBeforeRebuild) {
             Mutation m = actualMutationList.get(0);
             if (m instanceof Put && 
mostRecentIndexRowKeys.contains(m.getRow())) {
                 // We do check here only the latest version as older versions 
will always be unverified before
                 // newer versions are inserted.
-                updateUnverifiedIndexRowCounters((Put) 
actualMutationList.get(0), verificationPhaseResult);
+                updateUnverifiedIndexRowCounters((Put) m, 
getTimestamp(expectedMutationList.get(0)), oldIndexRowsToBeDeletedList, 
verificationPhaseResult);
             }
+        }
+        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
             repairActualMutationList(actualMutationList, expectedMutationList);
         }
         cleanUpActualMutationList(actualMutationList);
@@ -738,7 +768,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
     }
 
     private void verifyIndexRows(Map<byte[], List<Mutation>> 
indexKeyToMutationMap,
-                                 Set<byte[]> mostRecentIndexRowKeys,
+                                 Set<byte[]> mostRecentIndexRowKeys, 
List<Mutation> oldIndexRowsToBeDeletedList,
                                  IndexToolVerificationResult.PhaseResult 
verificationPhaseResult,
                                  boolean isBeforeRebuild) throws IOException {
         List<KeyRange> keys = new ArrayList<>(indexKeyToMutationMap.size());
@@ -757,7 +787,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         indexScan.setCacheBlocks(false);
         try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
             for (Result result = resultScanner.next(); (result != null); 
result = resultScanner.next()) {
-                if (!verifySingleIndexRow(result, indexKeyToMutationMap, 
mostRecentIndexRowKeys,
+                if (!verifySingleIndexRow(result, indexKeyToMutationMap, 
mostRecentIndexRowKeys, oldIndexRowsToBeDeletedList,
                         verificationPhaseResult, isBeforeRebuild)) {
                     invalidIndexRows.put(result.getRow(), 
indexKeyToMutationMap.get(result.getRow()));
                 }
@@ -808,6 +838,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
     }
 
     private void rebuildIndexRows(Map<byte[], List<Mutation>> 
indexKeyToMutationMap,
+                                  List<Mutation> oldIndexRowsToBeDeletedList,
                                   IndexToolVerificationResult 
verificationResult) throws IOException {
         if (ignoreIndexRebuildForTesting) {
             return;
@@ -827,6 +858,20 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
             if (batchSize > 0) {
                 indexHTable.batch(indexUpdates);
             }
+            batchSize = 0;
+            indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+            for (Mutation mutation : oldIndexRowsToBeDeletedList) {
+                indexUpdates.add(mutation);
+                batchSize ++;
+                if (batchSize >= maxBatchSize) {
+                    indexHTable.batch(indexUpdates);
+                    batchSize = 0;
+                    indexUpdates = new ArrayList<Mutation>(maxBatchSize);
+                }
+            }
+            if (batchSize > 0) {
+                indexHTable.batch(indexUpdates);
+            }
             if (verify) {
                 
verificationResult.setRebuiltIndexRowCount(verificationResult.getRebuiltIndexRowCount()
 + indexKeyToMutationMap.size());
             }
@@ -838,23 +883,26 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
     private void rebuildAndOrVerifyIndexRows(Map<byte[], List<Mutation>> 
indexKeyToMutationMap,
                                              Set<byte[]> 
mostRecentIndexRowKeys,
                                              IndexToolVerificationResult 
verificationResult) throws IOException {
+        List<Mutation> oldIndexRowsToBeDeletedList = new ArrayList<>();
         if (verifyType == IndexTool.IndexVerifyType.NONE) {
-            rebuildIndexRows(indexKeyToMutationMap, verificationResult);
+            rebuildIndexRows(indexKeyToMutationMap, 
oldIndexRowsToBeDeletedList, verificationResult);
         } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
verificationResult.getBefore(), true);
+            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
Collections.EMPTY_LIST, verificationResult.getBefore(), true);
         } else if (verifyType == IndexTool.IndexVerifyType.BEFORE) {
-            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
verificationResult.getBefore(), true);
-            if (!indexKeyToMutationMap.isEmpty()) {
-                rebuildIndexRows(indexKeyToMutationMap, verificationResult);
+            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
oldIndexRowsToBeDeletedList, verificationResult.getBefore(), true);
+            if (!indexKeyToMutationMap.isEmpty() || 
!oldIndexRowsToBeDeletedList.isEmpty()) {
+                rebuildIndexRows(indexKeyToMutationMap, 
oldIndexRowsToBeDeletedList, verificationResult);
             }
         } else if (verifyType == IndexTool.IndexVerifyType.AFTER) {
-            rebuildIndexRows(indexKeyToMutationMap, verificationResult);
-            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
verificationResult.getAfter(), false);
+            rebuildIndexRows(indexKeyToMutationMap, Collections.EMPTY_LIST, 
verificationResult);
+            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
Collections.EMPTY_LIST, verificationResult.getAfter(), false);
         } else { // verifyType == IndexTool.IndexVerifyType.BOTH
-            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
verificationResult.getBefore(), true);
+            verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
oldIndexRowsToBeDeletedList, verificationResult.getBefore(), true);
+            if (!indexKeyToMutationMap.isEmpty() || 
!oldIndexRowsToBeDeletedList.isEmpty()) {
+                rebuildIndexRows(indexKeyToMutationMap, 
oldIndexRowsToBeDeletedList, verificationResult);
+            }
             if (!indexKeyToMutationMap.isEmpty()) {
-                rebuildIndexRows(indexKeyToMutationMap, verificationResult);
-                verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
verificationResult.getAfter(), false);
+                verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, 
Collections.EMPTY_LIST, verificationResult.getAfter(), false);
             }
         }
     }
@@ -1176,7 +1224,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         List<Mutation> indexMutations = 
prepareIndexMutationsForRebuild(indexMaintainer, put, del);
         boolean mostRecentDone = false;
         // Do not populate mostRecentIndexRowKeys when verifyType != 
IndexTool.IndexVerifyType.ONLY
-        if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+        if (verifyType == IndexTool.IndexVerifyType.NONE || verifyType == 
IndexTool.IndexVerifyType.AFTER) {
             mostRecentDone = true;
         }
         for (Mutation mutation : indexMutations) {
@@ -1259,7 +1307,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
                 } while (hasMore && indexMutationCount < pageSizeInRows);
                 if (!indexKeyToMutationMap.isEmpty()) {
                     if (indexRowKeyforReadRepair != null) {
-                        rebuildIndexRows(indexKeyToMutationMap, 
verificationResult);
+                        rebuildIndexRows(indexKeyToMutationMap, 
Collections.EMPTY_LIST, verificationResult);
                     } else {
                         verifyAndOrRebuildIndex(indexKeyToMutationMap, 
mostRecentIndexRowKeys);
                     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index aefd7e8..85aa69a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
@@ -60,6 +61,9 @@ import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
 import org.apache.phoenix.hbase.index.parallel.Task;
 import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -88,6 +92,11 @@ public class IndexerRegionScanner extends 
GlobalIndexRegionScanner {
                           final RegionCoprocessorEnvironment env,
                           UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner, region, scan, env);
+        pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+                new ThreadPoolBuilder("IndexVerify",
+                        
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+                        
DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+                        INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
         this.ungroupedAggregateRegionObserver = 
ungroupedAggregateRegionObserver;
         if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) 
== null) {
             partialRebuild = true;
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 1b8ed55..bf373a1 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -274,7 +274,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
         //setup
         when(GlobalIndexRegionScanner.getIndexRowKey(indexMaintainer, 
put)).thenCallRealMethod();
         when(rebuildScanner.prepareIndexMutations(put, delete, 
indexKeyToMutationMap, mostRecentIndexRowKeys)).thenCallRealMethod();
-        when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(), 
Matchers.<Map>any(), Matchers.<Set>any(),
+        when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(), 
Matchers.<Map>any(), Matchers.<Set>any(), Matchers.<List>any(),
                 Matchers.<IndexToolVerificationResult.PhaseResult>any(), 
Matchers.anyBoolean())).thenCallRealMethod();
         doNothing().when(rebuildScanner)
                 
.logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
@@ -300,7 +300,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.VALID_EXACT_MATCH);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -313,7 +313,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.VALID_MORE_MUTATIONS);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -326,7 +326,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.VALID_MIX_MUTATIONS);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -339,7 +339,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, 
TestType.VALID_NEW_UNVERIFIED_MUTATIONS);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -355,7 +355,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 initializeLocalMockitoSetup(entry, TestType.EXPIRED);
                 expireThisRow();
                 //test code
-                rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+                rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
                 assertTrue(actualPR.equals(expectedPR));
             }
@@ -372,7 +372,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.INVALID_CELL_VALUE);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -385,7 +385,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.INVALID_EMPTY_CELL);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -399,7 +399,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.INVALID_COLUMN);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -412,7 +412,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.INVALID_EXTRA_CELL);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertTrue(actualPR.equals(expectedPR));
         }
@@ -423,7 +423,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
         when(indexRow.getRow()).thenReturn(Bytes.toBytes(1));
         exceptionRule.expect(DoNotRetryIOException.class);
         
exceptionRule.expectMessage(IndexRebuildRegionScanner.NO_EXPECTED_MUTATION);
-        rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, 
mostRecentIndexRowKeys, actualPR, true);
+        rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, 
mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true);
     }
 
     @Test
@@ -432,7 +432,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 entry : indexKeyToMutationMap.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.VALID_EXTRA_CELL);
             //test code
-            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true);
+            rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, true);
 
             assertEquals(1, actualPR.getIndexHasExtraCellsCount());
         }
@@ -490,7 +490,7 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
         injectEdge.incrementValue(1);
         IndexToolVerificationResult.PhaseResult actualPR = new 
IndexToolVerificationResult.PhaseResult();
         // Report this validation as a success
-        assertTrue(rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true));
+        assertTrue(rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, 
actualPR, false));
         // validIndexRowCount = 1
         IndexToolVerificationResult.PhaseResult expectedPR = new 
IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0);
         assertTrue(actualPR.equals(expectedPR));
@@ -539,13 +539,14 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
 
         Map<byte[], List<Mutation>> indexKeyToMutationMap = 
Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
         indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations);
+        mostRecentIndexRowKeys = new TreeSet<>(Bytes.BYTES_COMPARATOR);
         
when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations);
         when(indexRow.getRow()).thenReturn(indexRowKey1Bytes);
 
         injectEdge.incrementValue(1);
         IndexToolVerificationResult.PhaseResult actualPR = new 
IndexToolVerificationResult.PhaseResult();
         // Report this validation as a failure
-        assertFalse(rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true));
+        assertFalse(rebuildScanner.verifySingleIndexRow(indexRow, 
indexKeyToMutationMap, mostRecentIndexRowKeys, new ArrayList<Mutation>(), 
actualPR, true));
         // beyondMaxLookBackInvalidIndexRowCount = 1
         IndexToolVerificationResult.PhaseResult expectedPR = new 
IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1, 0, 0);
         assertTrue(actualPR.equals(expectedPR));
@@ -762,4 +763,4 @@ public class VerifySingleIndexRowTest extends 
BaseConnectionlessQueryTest {
                 CellUtil.cloneFamily(origList.get(0)), 
Bytes.toBytes(INCLUDED_COLUMN),
                 ts, type.getCode(), Bytes.toBytes("asdfg"));
     }
-}
\ No newline at end of file
+}

Reply via email to