This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 8a57fca PHOENIX-5896: Implement incremental rebuild along the failed regions in IndexTool (Addendum) (#784) 8a57fca is described below commit 8a57fca8e4170cc55dd8daa266a74f39ae1212a0 Author: Swaroopa Kadam <swaroopa.kada...@gmail.com> AuthorDate: Wed May 20 13:54:51 2020 -0700 PHOENIX-5896: Implement incremental rebuild along the failed regions in IndexTool (Addendum) (#784) --- .../end2end/IndexToolForNonTxGlobalIndexIT.java | 48 ++++++++++++++++------ .../coprocessor/GlobalIndexRegionScanner.java | 1 - .../coprocessor/IndexRebuildRegionScanner.java | 26 ++++++++---- .../phoenix/coprocessor/IndexerRegionScanner.java | 1 + .../apache/phoenix/mapreduce/index/IndexTool.java | 4 +- .../index/IndexVerificationResultRepository.java | 2 +- .../org/apache/phoenix/index/IndexToolTest.java | 24 +++++++++++ .../org/apache/phoenix/index/ShouldVerifyTest.java | 11 ++--- 8 files changed, 87 insertions(+), 30 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java index 91b9258..16813ee 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -512,6 +513,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT String indexTableName = generateUniqueName(); String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + List<String> expectedStatus = new ArrayList<>(); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE TABLE " + dataTableFullName + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) "+tableDDLOptions); @@ -525,27 +527,37 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT IndexTool it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.AFTER); Long scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L); - verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, RUN_STATUS_EXECUTED); + expectedStatus.add(RUN_STATUS_EXECUTED); + expectedStatus.add(RUN_STATUS_EXECUTED); + expectedStatus.add(RUN_STATUS_EXECUTED); - exceptionRule.expect(RuntimeException.class); - exceptionRule.expectMessage(RETRY_VERIFY_NOT_APPLICABLE); - it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, - null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", String.valueOf(10L)); + verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, expectedStatus); + + deleteOneRowFromResultTable(conn, scn, indexTableFullName); it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn)); scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L); - verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 6, RUN_STATUS_SKIPPED); + + expectedStatus.set(0, RUN_STATUS_EXECUTED); + expectedStatus.set(1, RUN_STATUS_SKIPPED); + expectedStatus.set(2, RUN_STATUS_SKIPPED); + + verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 5, expectedStatus); conn.createStatement().execute( "DELETE FROM "+indexTableFullName); conn.commit(); TestUtil.doMajorCompaction(conn, indexTableFullName); + expectedStatus.set(0, RUN_STATUS_SKIPPED); + expectedStatus.set(1, RUN_STATUS_SKIPPED); + expectedStatus.set(2, RUN_STATUS_SKIPPED); + it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn)); scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L); - verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 9, RUN_STATUS_SKIPPED); + verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 8, expectedStatus); ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + indexTableFullName); Assert.assertFalse(rs.next()); @@ -554,10 +566,22 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT Assert.assertFalse(it.isValidLastVerifyTime(10L)); Assert.assertFalse(it.isValidLastVerifyTime(EnvironmentEdgeManager.currentTimeMillis() - 1000L)); Assert.assertTrue(it.isValidLastVerifyTime(scn)); + + IndexToolIT.dropIndexToolTables(conn); } } - private List<String> verifyRunStatusFromResultTable(Connection conn, Long scn, String indexTable, int totalRows, String expectedStatus) throws SQLException, IOException { + private void deleteOneRowFromResultTable(Connection conn, Long scn, String indexTable) + throws SQLException, IOException { + Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(RESULT_TABLE_NAME_BYTES); + Scan s = new Scan(); + s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", scn, ROW_KEY_SEPARATOR, indexTable))); + ResultScanner rs = hIndexToolTable.getScanner(s); + hIndexToolTable.delete(new Delete(rs.next().getRow())); + } + + private List<String> verifyRunStatusFromResultTable(Connection conn, Long scn, String indexTable, int totalRows, List<String> expectedStatus) throws SQLException, IOException { Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices() .getTable(RESULT_TABLE_NAME_BYTES); Assert.assertEquals(totalRows, TestUtil.getRowCount(hIndexToolTable, false)); @@ -575,10 +599,10 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT count++; } //for each region - Assert.assertEquals(count, 3); - Assert.assertEquals(expectedStatus, output.get(0)); - Assert.assertEquals(expectedStatus, output.get(1)); - Assert.assertEquals(expectedStatus, output.get(2)); + Assert.assertEquals(3, count); + for(int i=0; i< count; i++) { + Assert.assertEquals(expectedStatus.get(i), output.get(i)); + } return output; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index b5334d1..e7e3187 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -101,7 +101,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { this.scan = scan; this.innerScanner = innerScanner; this.region = region; - verificationResult = new IndexToolVerificationResult(scan); // Create the following objects only for rebuilds by IndexTool hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION); indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 6b4bab1..76ae453 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -116,6 +116,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { private byte[][] viewConstants; private IndexVerificationOutputRepository verificationOutputRepository; private boolean skipped = false; + private boolean shouldVerifyCheckDone = false; @VisibleForTesting public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, @@ -153,6 +154,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); verificationOutputRepository = new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory); + verificationResult = new IndexToolVerificationResult(scan); verificationResultRepository = new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); @@ -169,13 +171,14 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { @VisibleForTesting public boolean shouldVerify(IndexTool.IndexVerifyType verifyType, byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer, - IndexVerificationResultRepository verificationResultRepository) throws IOException { + IndexVerificationResultRepository verificationResultRepository, boolean shouldVerifyCheckDone) throws IOException { this.verifyType = verifyType; this.indexRowKey = indexRowKey; this.scan = scan; this.region = region; this.indexMaintainer = indexMaintainer; this.verificationResultRepository = verificationResultRepository; + this.shouldVerifyCheckDone = shouldVerifyCheckDone; return shouldVerify(); } @@ -184,12 +187,17 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { //All other types of rebuilds/verification should be incrementally performed if appropriate param is passed byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY); Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue); - if(indexRowKey != null || lastVerifyTime == 0) { + if(indexRowKey != null || lastVerifyTime == 0 || shouldVerifyCheckDone) { return true; } - verificationResult = verificationResultRepository - .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName()); - return verificationResult == null; + + IndexToolVerificationResult verificationResultTemp = verificationResultRepository + .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName()) ; + if(verificationResultTemp != null) { + verificationResult = verificationResultTemp; + } + shouldVerifyCheckDone = true; + return verificationResultTemp == null; } private void setReturnCodeForSingleRowRebuild() throws IOException { @@ -1227,14 +1235,14 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } Cell lastCell = null; int rowCount = 0; - if(!shouldVerify()) { - skipped = true; - return false; - } region.startRegionOperation(); try { byte[] uuidValue = ServerCacheClient.generateId(); synchronized (innerScanner) { + if(!shouldVerify()) { + skipped = true; + return false; + } do { List<Cell> row = new ArrayList<Cell>(); hasMore = innerScanner.nextRaw(row); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java index ad8924e..5e00bf0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -80,6 +80,7 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner { final RegionCoprocessorEnvironment env) throws IOException { super(innerScanner, region, scan, env); indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + verificationResult = new IndexToolVerificationResult(scan); verificationResultRepository = new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 81cbea3..4c0c2d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -791,12 +791,12 @@ public class IndexTool extends Configured implements Tool { return 0; } - private void validateLastVerifyTime() throws Exception { + public int validateLastVerifyTime() throws Exception { Long currentTime = EnvironmentEdgeManager.currentTimeMillis(); if (lastVerifyTime.compareTo(currentTime) > 0 || lastVerifyTime == 0L || !isValidLastVerifyTime(lastVerifyTime)) { throw new RuntimeException(RETRY_VERIFY_NOT_APPLICABLE); } - + return 0; } public boolean isValidLastVerifyTime(Long lastVerifyTime) throws Exception { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java index 08c431a..d6cb2cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java @@ -272,7 +272,7 @@ public class IndexVerificationResultRepository implements AutoCloseable { throws IOException { IndexToolVerificationResult verificationResult = null; Result result = htable.get(new Get(oldRowKey)); - if(result != null) { + if(!result.isEmpty()) { byte[][] rowKeyParts = ByteUtil.splitArrayBySeparator(result.getRow(), ROW_KEY_SEPARATOR_BYTE[0]); verificationResult = new IndexToolVerificationResult(scan); verificationResult.setStartRow(rowKeyParts[3]); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java index 92317d4..fd5164f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java @@ -34,6 +34,8 @@ import org.mockito.MockitoAnnotations; import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE; import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE; +import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndexToolTest extends BaseTest { @@ -225,4 +227,26 @@ public class IndexToolTest extends BaseTest { exceptionRule.expectMessage(FEATURE_NOT_APPLICABLE); IndexTool.checkIfFeatureApplicable(null, null, lastVerifyTime, pDataTable, !localIndex); } + + @Test + public void testIncrcementalVerifyOption_notApplicable() throws Exception { + IndexTool mockTool = Mockito.mock(IndexTool.class); + when(mockTool.getLastVerifyTime()).thenCallRealMethod(); + Long lastVerifyTime = 10L; + String [] args = + IndexToolIT.getArgValues(true, true, schema, + dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.AFTER, + lastVerifyTime); + when(mockTool.parseOptions(args)).thenCallRealMethod(); + + CommandLine cmdLine = mockTool.parseOptions(args); + + when(mockTool.populateIndexToolAttributes(cmdLine)).thenCallRealMethod(); + when(mockTool.validateLastVerifyTime()).thenCallRealMethod(); + when(mockTool.isValidLastVerifyTime(lastVerifyTime)).thenReturn(false); + + exceptionRule.expect(RuntimeException.class); + exceptionRule.expectMessage(RETRY_VERIFY_NOT_APPLICABLE); + mockTool.populateIndexToolAttributes(cmdLine); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java index 8cc1970..727637e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java @@ -35,6 +35,7 @@ import org.mockito.MockitoAnnotations; import java.io.IOException; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.when; public class ShouldVerifyTest { @@ -54,13 +55,13 @@ public class ShouldVerifyTest { when(im.getIndexTableName()).thenReturn(Bytes.toBytes("indexName")); when(scanner.shouldVerify(any(IndexTool.IndexVerifyType.class), Matchers.<byte[]>any(), any(Scan.class), any(Region.class), any(IndexMaintainer.class), - any(IndexVerificationResultRepository.class))).thenCallRealMethod(); + any(IndexVerificationResultRepository.class), anyBoolean())).thenCallRealMethod(); } @Test public void testShouldVerify_repair_true() throws IOException { indexRowKey = new byte[5]; - Assert.assertTrue(scanner.shouldVerify(IndexTool.IndexVerifyType.ONLY, indexRowKey, scan, region, im, resultRepository)); + Assert.assertTrue(scanner.shouldVerify(IndexTool.IndexVerifyType.ONLY, indexRowKey, scan, region, im, resultRepository, false)); } @Test @@ -71,9 +72,9 @@ public class ShouldVerifyTest { } private void assertShouldVerify(boolean assertion) throws IOException { - Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.NONE, indexRowKey, scan, region, im, resultRepository)); - Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.BEFORE, indexRowKey, scan, region, im, resultRepository)); - Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.AFTER, indexRowKey, scan, region, im, resultRepository)); + Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.NONE, indexRowKey, scan, region, im, resultRepository, false)); + Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.BEFORE, indexRowKey, scan, region, im, resultRepository, false)); + Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.AFTER, indexRowKey, scan, region, im, resultRepository, false)); } @Test