This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new a89d153 PHOENIX-5957 Index rebuild should remove old index rows with higher timestamp a89d153 is described below commit a89d153ea7db3c5f2f41edbd453d2645cd504d7a Author: Kadir <kozde...@salesforce.com> AuthorDate: Mon Jun 15 16:14:18 2020 -0700 PHOENIX-5957 Index rebuild should remove old index rows with higher timestamp --- .../end2end/IndexToolForNonTxGlobalIndexIT.java | 78 +++++++++++++++- .../coprocessor/GlobalIndexRegionScanner.java | 5 - .../coprocessor/IndexRebuildRegionScanner.java | 102 +++++++++++++++------ .../phoenix/coprocessor/IndexerRegionScanner.java | 9 ++ .../phoenix/index/VerifySingleIndexRowTest.java | 31 ++++--- 5 files changed, 176 insertions(+), 49 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java index 1ddab5a..8de9cdb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java @@ -51,9 +51,11 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexScrutiny; import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; @@ -61,6 +63,7 @@ import org.apache.phoenix.util.TestUtil; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -141,7 +144,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT @BeforeClass public static synchronized void setup() throws Exception { - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(8); serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5)); serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, @@ -152,7 +155,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT Long.toString(Long.MAX_VALUE)); serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(4); clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); @@ -425,6 +429,76 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT } } + @Ignore("HBase 1.7 is required for this test") + @Test + public void testCleanUpOldDesignIndexRows() throws Exception { + if (!mutable) { + return; + } + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String stmString1 = + "CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + + tableDDLOptions; + conn.createStatement().execute(stmString1); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + + // Insert N_ROWS rows + final int N_ROWS = 100; + final int N_OLD_ROWS = 10; + for (int i = 0; i < N_ROWS; i++) { + IndexToolIT.upsertRow(stmt1, i); + } + conn.commit(); + String stmtString2 = + String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName); + conn.createStatement().execute(stmtString2); + + // Run the index MR job and verify that the index table is built correctly + IndexTool indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]); + assertEquals(N_ROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); + assertEquals(N_ROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue()); + assertEquals(N_ROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(N_ROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue()); + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(N_ROWS, actualRowCount); + // Read N_OLD_ROWS index rows and rewrite them back directly. This will overwrite existing rows with newer + // timestamps and set the empty column to value "x". The will make them old design rows (for mutable indexes) + String stmtString3 = + String.format( + "UPSERT INTO %s SELECT * FROM %s LIMIT %d", indexTableFullName, indexTableFullName, N_OLD_ROWS); + conn.createStatement().execute(stmtString3); + conn.commit(); + // Verify that IndexTool reports that there are old design index rows + indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, new String[0]); + assertEquals(N_OLD_ROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + // Clean up all old design rows + indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]); + assertEquals(N_OLD_ROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + // Verify that IndexTool does not report them anymore + indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(N_ROWS, actualRowCount); + } + } + @Test public void testIndexToolVerifyBeforeAndBothOptions() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index d74aa66..726708a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -134,11 +134,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive(); maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config); - pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( - new ThreadPoolBuilder("IndexVerify", - env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, - DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout( - INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env)); rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY, DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 6f82db3..50b1ffa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn; +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; @@ -59,6 +60,9 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; +import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.util.ByteUtil; @@ -116,28 +120,35 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { if (indexRowKeyforReadRepair != null) { setReturnCodeForSingleRowRebuild(); pageSizeInRows = 1; - } else { - try(org.apache.hadoop.hbase.client.Connection connection = - HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) { - indexRegionEndKeys = connection.getRegionLocator(indexHTable.getName()).getEndKeys(); - } + return; + } + try(org.apache.hadoop.hbase.client.Connection connection = + HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) { + indexRegionEndKeys = connection.getRegionLocator(indexHTable.getName()).getEndKeys(); } + pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( + new ThreadPoolBuilder("IndexVerify", + env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, + DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout( + INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env)); + if (verify) { viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); byte[] disableLoggingValueBytes = - scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE); + scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE); if (disableLoggingValueBytes != null) { disableLoggingVerifyType = - IndexTool.IndexDisableLoggingType.fromValue(disableLoggingValueBytes); + IndexTool.IndexDisableLoggingType.fromValue(disableLoggingValueBytes); } verificationOutputRepository = - new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName() - , hTableFactory, disableLoggingVerifyType); + new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName() + , hTableFactory, disableLoggingVerifyType); verificationResult = new IndexToolVerificationResult(scan); verificationResultRepository = new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); nextStartKey = null; minTimestamp = scan.getTimeRange().getMin(); + new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); } } @@ -209,6 +220,11 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { @Override public void close() throws IOException { innerScanner.close(); + if (indexRowKeyforReadRepair != null) { + hTableFactory.shutdown(); + indexHTable.close(); + return; + } if (verify) { try { if (verificationResultRepository != null) { @@ -227,6 +243,11 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } } } + else { + this.pool.stop("IndexRebuildRegionScanner is closing"); + hTableFactory.shutdown(); + indexHTable.close(); + } } @VisibleForTesting @@ -440,7 +461,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { return getMutationsWithSameTS(put, del); } - private void updateUnverifiedIndexRowCounters(Put actual, + private void updateUnverifiedIndexRowCounters(Put actual, long expectedTs, List<Mutation> oldIndexRowsToBeDeletedList, IndexToolVerificationResult.PhaseResult verificationPhaseResult) { // Get the empty column of the given index row List<Cell> cellList = actual.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), @@ -463,6 +484,13 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } // The empty column value is neither "verified" or "unverified". This must be a row from the old design verificationPhaseResult.setOldIndexRowCount(verificationPhaseResult.getOldIndexRowCount() + 1); + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { + long actualTs = getTimestamp(actual); + if (actualTs > expectedTs) { + oldIndexRowsToBeDeletedList.add(indexMaintainer.buildRowDeleteMutation(actual.getRow(), + IndexMaintainer.DeleteType.SINGLE_VERSION, actualTs)); + } + } } /** @@ -599,7 +627,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { @VisibleForTesting public boolean verifySingleIndexRow(Result indexRow, Map<byte[], List<Mutation>> perTaskIndexKeyToMutationMap, - Set<byte[]> mostRecentIndexRowKeys, + Set<byte[]> mostRecentIndexRowKeys, List<Mutation> oldIndexRowsToBeDeletedList, IndexToolVerificationResult.PhaseResult verificationPhaseResult, boolean isBeforeRebuild) throws IOException { @@ -613,13 +641,15 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR); Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR); - if (verifyType == IndexTool.IndexVerifyType.ONLY) { + if (isBeforeRebuild) { Mutation m = actualMutationList.get(0); if (m instanceof Put && mostRecentIndexRowKeys.contains(m.getRow())) { // We do check here only the latest version as older versions will always be unverified before // newer versions are inserted. - updateUnverifiedIndexRowCounters((Put) actualMutationList.get(0), verificationPhaseResult); + updateUnverifiedIndexRowCounters((Put) m, getTimestamp(expectedMutationList.get(0)), oldIndexRowsToBeDeletedList, verificationPhaseResult); } + } + if (verifyType == IndexTool.IndexVerifyType.ONLY) { repairActualMutationList(actualMutationList, expectedMutationList); } cleanUpActualMutationList(actualMutationList); @@ -738,7 +768,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } private void verifyIndexRows(Map<byte[], List<Mutation>> indexKeyToMutationMap, - Set<byte[]> mostRecentIndexRowKeys, + Set<byte[]> mostRecentIndexRowKeys, List<Mutation> oldIndexRowsToBeDeletedList, IndexToolVerificationResult.PhaseResult verificationPhaseResult, boolean isBeforeRebuild) throws IOException { List<KeyRange> keys = new ArrayList<>(indexKeyToMutationMap.size()); @@ -757,7 +787,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { indexScan.setCacheBlocks(false); try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) { for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) { - if (!verifySingleIndexRow(result, indexKeyToMutationMap, mostRecentIndexRowKeys, + if (!verifySingleIndexRow(result, indexKeyToMutationMap, mostRecentIndexRowKeys, oldIndexRowsToBeDeletedList, verificationPhaseResult, isBeforeRebuild)) { invalidIndexRows.put(result.getRow(), indexKeyToMutationMap.get(result.getRow())); } @@ -808,6 +838,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } private void rebuildIndexRows(Map<byte[], List<Mutation>> indexKeyToMutationMap, + List<Mutation> oldIndexRowsToBeDeletedList, IndexToolVerificationResult verificationResult) throws IOException { if (ignoreIndexRebuildForTesting) { return; @@ -827,6 +858,20 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { if (batchSize > 0) { indexHTable.batch(indexUpdates); } + batchSize = 0; + indexUpdates = new ArrayList<Mutation>(maxBatchSize); + for (Mutation mutation : oldIndexRowsToBeDeletedList) { + indexUpdates.add(mutation); + batchSize ++; + if (batchSize >= maxBatchSize) { + indexHTable.batch(indexUpdates); + batchSize = 0; + indexUpdates = new ArrayList<Mutation>(maxBatchSize); + } + } + if (batchSize > 0) { + indexHTable.batch(indexUpdates); + } if (verify) { verificationResult.setRebuiltIndexRowCount(verificationResult.getRebuiltIndexRowCount() + indexKeyToMutationMap.size()); } @@ -838,23 +883,26 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { private void rebuildAndOrVerifyIndexRows(Map<byte[], List<Mutation>> indexKeyToMutationMap, Set<byte[]> mostRecentIndexRowKeys, IndexToolVerificationResult verificationResult) throws IOException { + List<Mutation> oldIndexRowsToBeDeletedList = new ArrayList<>(); if (verifyType == IndexTool.IndexVerifyType.NONE) { - rebuildIndexRows(indexKeyToMutationMap, verificationResult); + rebuildIndexRows(indexKeyToMutationMap, oldIndexRowsToBeDeletedList, verificationResult); } else if (verifyType == IndexTool.IndexVerifyType.ONLY) { - verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, verificationResult.getBefore(), true); + verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, verificationResult.getBefore(), true); } else if (verifyType == IndexTool.IndexVerifyType.BEFORE) { - verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, verificationResult.getBefore(), true); - if (!indexKeyToMutationMap.isEmpty()) { - rebuildIndexRows(indexKeyToMutationMap, verificationResult); + verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, oldIndexRowsToBeDeletedList, verificationResult.getBefore(), true); + if (!indexKeyToMutationMap.isEmpty() || !oldIndexRowsToBeDeletedList.isEmpty()) { + rebuildIndexRows(indexKeyToMutationMap, oldIndexRowsToBeDeletedList, verificationResult); } } else if (verifyType == IndexTool.IndexVerifyType.AFTER) { - rebuildIndexRows(indexKeyToMutationMap, verificationResult); - verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, verificationResult.getAfter(), false); + rebuildIndexRows(indexKeyToMutationMap, Collections.EMPTY_LIST, verificationResult); + verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, verificationResult.getAfter(), false); } else { // verifyType == IndexTool.IndexVerifyType.BOTH - verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, verificationResult.getBefore(), true); + verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, oldIndexRowsToBeDeletedList, verificationResult.getBefore(), true); + if (!indexKeyToMutationMap.isEmpty() || !oldIndexRowsToBeDeletedList.isEmpty()) { + rebuildIndexRows(indexKeyToMutationMap, oldIndexRowsToBeDeletedList, verificationResult); + } if (!indexKeyToMutationMap.isEmpty()) { - rebuildIndexRows(indexKeyToMutationMap, verificationResult); - verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, verificationResult.getAfter(), false); + verifyIndexRows(indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, verificationResult.getAfter(), false); } } } @@ -1176,7 +1224,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del); boolean mostRecentDone = false; // Do not populate mostRecentIndexRowKeys when verifyType != IndexTool.IndexVerifyType.ONLY - if (verifyType != IndexTool.IndexVerifyType.ONLY) { + if (verifyType == IndexTool.IndexVerifyType.NONE || verifyType == IndexTool.IndexVerifyType.AFTER) { mostRecentDone = true; } for (Mutation mutation : indexMutations) { @@ -1259,7 +1307,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } while (hasMore && indexMutationCount < pageSizeInRows); if (!indexKeyToMutationMap.isEmpty()) { if (indexRowKeyforReadRepair != null) { - rebuildIndexRows(indexKeyToMutationMap, verificationResult); + rebuildIndexRows(indexKeyToMutationMap, Collections.EMPTY_LIST, verificationResult); } else { verifyAndOrRebuildIndex(indexKeyToMutationMap, mostRecentIndexRowKeys); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java index aefd7e8..85aa69a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; @@ -60,6 +61,9 @@ import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; import org.apache.phoenix.hbase.index.parallel.Task; import org.apache.phoenix.hbase.index.parallel.TaskBatch; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; +import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; @@ -88,6 +92,11 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner { final RegionCoprocessorEnvironment env, UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException { super(innerScanner, region, scan, env); + pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( + new ThreadPoolBuilder("IndexVerify", + env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, + DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout( + INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env)); this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver; if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) == null) { partialRebuild = true; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java index 1b8ed55..bf373a1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java @@ -274,7 +274,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { //setup when(GlobalIndexRegionScanner.getIndexRowKey(indexMaintainer, put)).thenCallRealMethod(); when(rebuildScanner.prepareIndexMutations(put, delete, indexKeyToMutationMap, mostRecentIndexRowKeys)).thenCallRealMethod(); - when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(), Matchers.<Map>any(), Matchers.<Set>any(), + when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(), Matchers.<Map>any(), Matchers.<Set>any(), Matchers.<List>any(), Matchers.<IndexToolVerificationResult.PhaseResult>any(), Matchers.anyBoolean())).thenCallRealMethod(); doNothing().when(rebuildScanner) .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(), @@ -300,7 +300,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.VALID_EXACT_MATCH); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -313,7 +313,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.VALID_MORE_MUTATIONS); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -326,7 +326,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.VALID_MIX_MUTATIONS); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -339,7 +339,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.VALID_NEW_UNVERIFIED_MUTATIONS); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -355,7 +355,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { initializeLocalMockitoSetup(entry, TestType.EXPIRED); expireThisRow(); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -372,7 +372,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.INVALID_CELL_VALUE); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -385,7 +385,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.INVALID_EMPTY_CELL); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -399,7 +399,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.INVALID_COLUMN); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -412,7 +412,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.INVALID_EXTRA_CELL); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertTrue(actualPR.equals(expectedPR)); } @@ -423,7 +423,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { when(indexRow.getRow()).thenReturn(Bytes.toBytes(1)); exceptionRule.expect(DoNotRetryIOException.class); exceptionRule.expectMessage(IndexRebuildRegionScanner.NO_EXPECTED_MUTATION); - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); } @Test @@ -432,7 +432,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { entry : indexKeyToMutationMap.entrySet()) { initializeLocalMockitoSetup(entry, TestType.VALID_EXTRA_CELL); //test code - rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true); + rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, true); assertEquals(1, actualPR.getIndexHasExtraCellsCount()); } @@ -490,7 +490,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { injectEdge.incrementValue(1); IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult(); // Report this validation as a success - assertTrue(rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true)); + assertTrue(rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, false)); // validIndexRowCount = 1 IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0); assertTrue(actualPR.equals(expectedPR)); @@ -539,13 +539,14 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR)); indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations); + mostRecentIndexRowKeys = new TreeSet<>(Bytes.BYTES_COMPARATOR); when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations); when(indexRow.getRow()).thenReturn(indexRowKey1Bytes); injectEdge.incrementValue(1); IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult(); // Report this validation as a failure - assertFalse(rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, actualPR, true)); + assertFalse(rebuildScanner.verifySingleIndexRow(indexRow, indexKeyToMutationMap, mostRecentIndexRowKeys, new ArrayList<Mutation>(), actualPR, true)); // beyondMaxLookBackInvalidIndexRowCount = 1 IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1, 0, 0); assertTrue(actualPR.equals(expectedPR)); @@ -762,4 +763,4 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { CellUtil.cloneFamily(origList.get(0)), Bytes.toBytes(INCLUDED_COLUMN), ts, type.getCode(), Bytes.toBytes("asdfg")); } -} \ No newline at end of file +}