This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 5f9364d PHOENIX-5896: Implement incremental rebuild along the failed regions in IndexTool (#779) 5f9364d is described below commit 5f9364db7e4925229704706e148e62f4cf4ec4c2 Author: Swaroopa Kadam <swaroopa.kada...@gmail.com> AuthorDate: Mon May 18 18:59:34 2020 -0700 PHOENIX-5896: Implement incremental rebuild along the failed regions in IndexTool (#779) --- .../end2end/IndexToolForNonTxGlobalIndexIT.java | 97 +++++++++++++++++ .../org/apache/phoenix/end2end/IndexToolIT.java | 18 +++- .../phoenix/end2end/IndexToolTimeRangeIT.java | 16 +-- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../coprocessor/GlobalIndexRegionScanner.java | 2 - .../coprocessor/IndexRebuildRegionScanner.java | 38 ++++++- .../phoenix/coprocessor/IndexerRegionScanner.java | 3 + .../PhoenixServerBuildIndexInputFormat.java | 6 +- .../apache/phoenix/mapreduce/index/IndexTool.java | 119 ++++++++++++++------- .../index/IndexVerificationOutputRepository.java | 21 ++-- .../index/IndexVerificationResultRepository.java | 76 +++++++++---- .../mapreduce/util/PhoenixConfigurationUtil.java | 12 +++ .../org/apache/phoenix/index/IndexToolTest.java | 47 ++++++-- .../apache/phoenix/index/IndexUpgradeToolTest.java | 6 +- .../org/apache/phoenix/index/ShouldVerifyTest.java | 98 +++++++++++++++++ .../util/PhoenixConfigurationUtilTest.java | 11 ++ 16 files changed, 472 insertions(+), 99 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java index 018ec0a..91b9258 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; @@ -36,6 +37,7 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexScrutiny; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -43,19 +45,33 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Properties; import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS; +import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.INDEX_TOOL_RUN_STATUS_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.ROW_KEY_SEPARATOR; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RUN_STATUS_EXECUTED; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RUN_STATUS_SKIPPED; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT; @@ -70,6 +86,7 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEF import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -81,6 +98,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT private boolean directApi = true; private boolean useSnapshot = false; private boolean mutable; + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); public IndexToolForNonTxGlobalIndexIT(boolean mutable) { StringBuilder optionBuilder = new StringBuilder(); @@ -485,4 +504,82 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT } } + @Test + public void testIndexToolForIncrementalRebuild() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) "+tableDDLOptions); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, dataTableFullName)); + + conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')"); + conn.createStatement().execute("upsert into " + dataTableFullName + " values (2, 'Phoenix1', 'B')"); + conn.commit(); + + IndexTool it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.AFTER); + Long scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L); + verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, RUN_STATUS_EXECUTED); + + + exceptionRule.expect(RuntimeException.class); + exceptionRule.expectMessage(RETRY_VERIFY_NOT_APPLICABLE); + it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", String.valueOf(10L)); + + it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn)); + scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L); + verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 6, RUN_STATUS_SKIPPED); + + conn.createStatement().execute( "DELETE FROM "+indexTableFullName); + conn.commit(); + TestUtil.doMajorCompaction(conn, indexTableFullName); + + it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn)); + scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L); + verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 9, RUN_STATUS_SKIPPED); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + indexTableFullName); + Assert.assertFalse(rs.next()); + + //testing the dependent method + Assert.assertFalse(it.isValidLastVerifyTime(10L)); + Assert.assertFalse(it.isValidLastVerifyTime(EnvironmentEdgeManager.currentTimeMillis() - 1000L)); + Assert.assertTrue(it.isValidLastVerifyTime(scn)); + } + } + + private List<String> verifyRunStatusFromResultTable(Connection conn, Long scn, String indexTable, int totalRows, String expectedStatus) throws SQLException, IOException { + Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(RESULT_TABLE_NAME_BYTES); + Assert.assertEquals(totalRows, TestUtil.getRowCount(hIndexToolTable, false)); + List<String> output = new ArrayList<>(); + Scan s = new Scan(); + s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", scn, ROW_KEY_SEPARATOR, indexTable))); + ResultScanner rs = hIndexToolTable.getScanner(s); + int count =0; + for(Result r : rs) { + Assert.assertTrue(r != null); + List<Cell> cells = r.getColumnCells(RESULT_TABLE_COLUMN_FAMILY, INDEX_TOOL_RUN_STATUS_BYTES); + Assert.assertEquals(cells.size(), 1); + Assert.assertTrue(Bytes.toString(cells.get(0).getRow()).startsWith(String.valueOf(scn))); + output.add(Bytes.toString(cells.get(0).getValue())); + count++; + } + //for each region + Assert.assertEquals(count, 3); + Assert.assertEquals(expectedStatus, output.get(0)); + Assert.assertEquals(expectedStatus, output.get(1)); + Assert.assertEquals(expectedStatus, output.get(2)); + return output; + } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 8a9cc10..a2bd788 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -631,7 +631,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName, String dataTable, String indxTable, String tenantId, - IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) { + IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime, Long incrementalVerify) { List<String> args = Lists.newArrayList(); if (schemaName != null) { args.add("-s"); @@ -664,6 +664,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { args.add("-et"); args.add(String.valueOf(endTime)); } + if(incrementalVerify!=null) { + args.add("-rv"); + args.add(String.valueOf(incrementalVerify)); + } args.add("-op"); args.add("/tmp/" + UUID.randomUUID().toString()); return args; @@ -672,7 +676,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName, String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) { List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, - tenantId, verifyType, null, null); + tenantId, verifyType, null, null, null); return args.toArray(new String[0]); } @@ -680,7 +684,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) { List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, - tenantId, verifyType, startTime, endTime); + tenantId, verifyType, startTime, endTime, null); + return args.toArray(new String[0]); + } + + public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName, + String dataTable, String indexTable, String tenantId, + IndexTool.IndexVerifyType verifyType, Long incrementalVerify) { + List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, + tenantId, verifyType, null, null, incrementalVerify); return args.toArray(new String[0]); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java index a63f06a..3deedad 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java @@ -130,8 +130,8 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT { @Test public void testValidTimeRange() throws Exception { String [] args = {"--delete-all-and-rebuild", - "--starttime", myClock.getRelativeTimeAsString(1), - "--endtime", myClock.getRelativeTimeAsString(9)}; + "--start-time", myClock.getRelativeTimeAsString(1), + "--end-time", myClock.getRelativeTimeAsString(9)}; runIndexTool(args, 0); // all rows should be rebuilt Assert.assertEquals(5, countRowsInIndex()); @@ -141,8 +141,8 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT { @Test public void testValidTimeRange_startTimeInBetween() throws Exception { String [] args = {"--delete-all-and-rebuild", - "--starttime", myClock.getRelativeTimeAsString(6), - "--endtime", myClock.getRelativeTimeAsString(9)}; + "--start-time", myClock.getRelativeTimeAsString(6), + "--end-time", myClock.getRelativeTimeAsString(9)}; runIndexTool(args, 0); // only last 3 rows should be rebuilt Assert.assertEquals(3, countRowsInIndex()); @@ -151,8 +151,8 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT { @Test public void testValidTimeRange_endTimeInBetween() throws Exception { String [] args = {"--delete-all-and-rebuild", - "--starttime", myClock.getRelativeTimeAsString(1), - "--endtime", myClock.getRelativeTimeAsString(6)}; + "--start-time", myClock.getRelativeTimeAsString(1), + "--end-time", myClock.getRelativeTimeAsString(6)}; runIndexTool(args, 0); // only first 2 should be rebuilt Assert.assertEquals(2, countRowsInIndex()); @@ -171,7 +171,7 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT { public void testValidTimeRange_onlyStartTimePassed() throws Exception { //starttime passed of last upsert String [] args = {"--delete-all-and-rebuild", - "--starttime", myClock.getRelativeTimeAsString(8)}; + "--start-time", myClock.getRelativeTimeAsString(8)}; runIndexTool(args, 0); Assert.assertEquals(1, countRowsInIndex()); } @@ -180,7 +180,7 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT { public void testValidTimeRange_onlyEndTimePassed() throws Exception { //end time passed as time of second upsert String [] args = {"--delete-all-and-rebuild", - "--endtime", myClock.getRelativeTimeAsString(5)}; + "--end-time", myClock.getRelativeTimeAsString(5)}; runIndexTool(args, 0); Assert.assertEquals(1, countRowsInIndex()); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 6e0a1e4..4897741 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -88,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging"; // Index verification type done by the index tool public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType"; + public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify"; /* * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index 35a0a8a..b5334d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -107,8 +107,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive(); maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config); - verificationResultRepository = - new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( new ThreadPoolBuilder("IndexVerify", env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 4f1c48f..9feb27f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -38,6 +38,7 @@ import java.util.concurrent.ExecutionException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.sun.org.apache.xpath.internal.operations.Bool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient; @@ -111,6 +114,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { private Map<byte[], NavigableSet<byte[]>> familyMap; private byte[][] viewConstants; private IndexVerificationOutputRepository verificationOutputRepository; + private boolean skipped = false; @VisibleForTesting public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, @@ -148,6 +152,8 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); verificationOutputRepository = new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory); + verificationResultRepository = + new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( @@ -159,6 +165,32 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } } + @VisibleForTesting + public boolean shouldVerify(IndexTool.IndexVerifyType verifyType, + byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer, + IndexVerificationResultRepository verificationResultRepository) throws IOException { + this.verifyType = verifyType; + this.indexRowKey = indexRowKey; + this.scan = scan; + this.region = region; + this.indexMaintainer = indexMaintainer; + this.verificationResultRepository = verificationResultRepository; + return shouldVerify(); + } + + private boolean shouldVerify() throws IOException { + //In case of read repair, proceed with rebuild + //All other types of rebuilds/verification should be incrementally performed if appropriate param is passed + byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY); + Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue); + if(indexRowKey != null || lastVerifyTime == 0) { + return true; + } + verificationResult = verificationResultRepository + .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName()); + return verificationResult == null; + } + private void setReturnCodeForSingleRowRebuild() throws IOException { try (RegionScanner scanner = region.getScanner(scan)) { List<Cell> row = new ArrayList<>(); @@ -200,7 +232,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { if (verify) { try { verificationResultRepository.logToIndexToolResultTable(verificationResult, - verifyType, region.getRegionInfo().getRegionName()); + verifyType, region.getRegionInfo().getRegionName(), skipped); } finally { this.pool.stop("IndexRebuildRegionScanner is closing"); hTableFactory.shutdown(); @@ -1191,6 +1223,10 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } Cell lastCell = null; int rowCount = 0; + if(!shouldVerify()) { + skipped = true; + return false; + } region.startRegionOperation(); try { byte[] uuidValue = ServerCacheClient.generateId(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java index 88bac86..b493729 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -58,6 +58,7 @@ import org.apache.phoenix.hbase.index.parallel.TaskBatch; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PVarbinary; @@ -78,6 +79,8 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner { final RegionCoprocessorEnvironment env) throws IOException { super(innerScanner, region, scan, env); indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + verificationResultRepository = + new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java index d129e93..9408369 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java @@ -43,6 +43,7 @@ import com.google.common.base.Preconditions; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getCurrentScnValue; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolLastVerifyTime; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue; @@ -74,6 +75,8 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE); final String currentScnValue = getCurrentScnValue(configuration); final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + final String lastVerifyTime = getIndexToolLastVerifyTime(configuration); + //until PHOENIX-5783 is fixed; we'll continue with startTime = 0 final String startTimeValue = null; final Properties overridingProps = new Properties(); @@ -97,11 +100,12 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName); MutationPlan plan = compiler.compile(indexTable); Scan scan = plan.getContext().getScan(); - + Long lastVerifyTimeValue = lastVerifyTime == null ? 0L : Long.valueOf(lastVerifyTime); try { scan.setTimeRange(startTime, scn); scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES); scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE, getIndexVerifyType(configuration).toBytes()); + scan.setAttribute(BaseScannerRegionObserver.INDEX_RETRY_VERIFY, Bytes.toBytes(lastVerifyTimeValue)); } catch (IOException e) { throw new SQLException(e); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index e30da37..81cbea3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -23,6 +23,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.ROW_KEY_SEPARATOR; + import java.io.IOException; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -49,6 +51,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; @@ -56,7 +59,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; @@ -92,7 +97,6 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; @@ -168,10 +172,11 @@ public class IndexTool extends Configured implements Tool { private PTable pDataTable; private String tenantId = null; private Job job; - private Long startTime, endTime; + private Long startTime, endTime, lastVerifyTime; private IndexType indexType; private String basePath; byte[][] splitKeysBeforeJob = null; + Configuration configuration; private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix schema name (optional)"); @@ -230,20 +235,27 @@ public class IndexTool extends Configured implements Tool { + "If specified, truncates the index table and rebuilds (optional)"); private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); - private static final Option START_TIME_OPTION = new Option("st", "starttime", + private static final Option START_TIME_OPTION = new Option("st", "start-time", true, "Start time for indextool rebuild or verify"); - private static final Option END_TIME_OPTION = new Option("et", "endtime", + private static final Option END_TIME_OPTION = new Option("et", "end-time", true, "End time for indextool rebuild or verify"); + private static final Option RETRY_VERIFY_OPTION = new Option("rv", "retry-verify", + true, "Max scan ts of the last rebuild/verify that needs to be retried incrementally"); + public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s"; public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than " + "or equal to endTime " + "or either of them are set in the future; IndexTool can't proceed."; - public static final String FEATURE_NOT_APPLICABLE = "starttime-endtime feature is only " + public static final String FEATURE_NOT_APPLICABLE = "start-time/end-time and retry verify feature are only " + "applicable for local or non-transactional global indexes"; + + public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts " + + "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table"; + private Options getOptions() { final Options options = new Options(); options.addOption(SCHEMA_NAME_OPTION); @@ -262,10 +274,12 @@ public class IndexTool extends Configured implements Tool { SPLIT_INDEX_OPTION.setOptionalArg(true); START_TIME_OPTION.setOptionalArg(true); END_TIME_OPTION.setOptionalArg(true); + RETRY_VERIFY_OPTION.setOptionalArg(true); options.addOption(AUTO_SPLIT_INDEX_OPTION); options.addOption(SPLIT_INDEX_OPTION); options.addOption(START_TIME_OPTION); options.addOption(END_TIME_OPTION); + options.addOption(RETRY_VERIFY_OPTION); return options; } @@ -338,9 +352,9 @@ public class IndexTool extends Configured implements Tool { return startTime; } - public Long getEndTime() { - return endTime; - } + public Long getEndTime() { return endTime; } + + public Long getLastVerifyTime() { return lastVerifyTime; } class JobFactory { Connection connection; @@ -379,6 +393,9 @@ public class IndexTool extends Configured implements Tool { if (endTime != null) { PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime); } + if (lastVerifyTime != null) { + PhoenixConfigurationUtil.setIndexToolLastVerifyTime(configuration, lastVerifyTime); + } return configureJobForServerBuildIndex(); } } @@ -475,21 +492,22 @@ public class IndexTool extends Configured implements Tool { for (String index : disableIndexes) { quotedIndexes.add("'" + index + "'"); } - ResultSet rs = connection.createStatement() + try (ResultSet rs = connection.createStatement() .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " (" + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " + TABLE_SCHEM + (schemaName != null && schemaName.length() > 0 ? "='" + schemaName + "'" : " IS NULL") - + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")"); - if (rs.next()) { - maxRebuilAsyncDate = rs.getLong(1); - maxDisabledTimeStamp = rs.getLong(2); - } - // Do check if table is disabled again after user invoked async rebuilding during the run of the job - if (maxRebuilAsyncDate > maxDisabledTimeStamp) { - return maxRebuilAsyncDate; - } else { - throw new RuntimeException( - "Inconsistent state we have one or more index tables which are disabled after the async is called!!"); + + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")")) { + if (rs.next()) { + maxRebuilAsyncDate = rs.getLong(1); + maxDisabledTimeStamp = rs.getLong(2); + } + // Do check if table is disabled again after user invoked async rebuilding during the run of the job + if (maxRebuilAsyncDate > maxDisabledTimeStamp) { + return maxRebuilAsyncDate; + } else { + throw new RuntimeException( + "Inconsistent state we have one or more index tables which are disabled after the async is called!!"); + } } } @@ -665,14 +683,18 @@ public class IndexTool extends Configured implements Tool { printHelpAndExit(e.getMessage(), getOptions()); return -1; } + configuration = HBaseConfiguration.addHbaseResources(getConf()); populateIndexToolAttributes(cmdLine); - Configuration configuration = getConfiguration(tenantId); + + if (tenantId != null) { + configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } try (Connection conn = getConnection(configuration)) { createIndexToolTables(conn); if (dataTable != null && indexTable != null) { setupIndexAndDataTable(conn); - checkTimeRangeFeature(startTime, endTime, pDataTable, isLocalIndexBuild); + checkIfFeatureApplicable(startTime, endTime, lastVerifyTime, pDataTable, isLocalIndexBuild); if (shouldDeleteBeforeRebuild) { deleteBeforeRebuild(conn); } @@ -694,18 +716,14 @@ public class IndexTool extends Configured implements Tool { } } - public static void checkTimeRangeFeature(Long startTime, Long endTime, PTable pDataTable, boolean isLocalIndexBuild) { - if (isTimeRangeSet(startTime, endTime) && !isTimeRangeFeatureApplicable(pDataTable, isLocalIndexBuild)) { - throw new RuntimeException(FEATURE_NOT_APPLICABLE); - } - } - - private Configuration getConfiguration(String tenantId) { - final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); - if (tenantId != null) { - configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + public static void checkIfFeatureApplicable(Long startTime, Long endTime, Long lastVerifyTime, + PTable pDataTable, boolean isLocalIndexBuild) { + boolean isApplicable = isFeatureApplicable(pDataTable, isLocalIndexBuild); + if (!isApplicable) { + if(isTimeRangeSet(startTime, endTime) || lastVerifyTime!=null) { + throw new RuntimeException(FEATURE_NOT_APPLICABLE); + } } - return configuration; } private boolean submitIndexToolJob(Connection conn, Configuration configuration) @@ -734,10 +752,11 @@ public class IndexTool extends Configured implements Tool { } @VisibleForTesting - public void populateIndexToolAttributes(CommandLine cmdLine) { + public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception { boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt()); boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt()); boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt()); + boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt()); boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt()); if (useTenantId) { @@ -749,6 +768,10 @@ public class IndexTool extends Configured implements Tool { if (useEndTime) { endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt())); } + if(retryVerify) { + lastVerifyTime = new Long(cmdLine.getOptionValue(RETRY_VERIFY_OPTION.getOpt())); + validateLastVerifyTime(); + } if(isTimeRangeSet(startTime, endTime)) { validateTimeRange(); } @@ -765,6 +788,30 @@ public class IndexTool extends Configured implements Tool { isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt()); + return 0; + } + + private void validateLastVerifyTime() throws Exception { + Long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + if (lastVerifyTime.compareTo(currentTime) > 0 || lastVerifyTime == 0L || !isValidLastVerifyTime(lastVerifyTime)) { + throw new RuntimeException(RETRY_VERIFY_NOT_APPLICABLE); + } + + } + + public boolean isValidLastVerifyTime(Long lastVerifyTime) throws Exception { + try(Connection conn = getConnection(configuration)) { + Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES); + Scan s = new Scan(); + ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices(); + boolean isNamespaceMapped = SchemaUtil.isNamespaceMappingEnabled(null, cqs.getProps()); + s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", lastVerifyTime, + ROW_KEY_SEPARATOR, + SchemaUtil.getPhysicalHBaseTableName(schemaName, indexTable, isNamespaceMapped)))); + ResultScanner rs = hIndexToolTable.getScanner(s); + return rs.next() != null; + } } private void validateTimeRange() { @@ -810,7 +857,7 @@ public class IndexTool extends Configured implements Tool { return startTime != null || endTime != null; } - private static boolean isTimeRangeFeatureApplicable(PTable dataTable, boolean isLocalIndexBuild) { + private static boolean isFeatureApplicable(PTable dataTable, boolean isLocalIndexBuild) { if (isLocalIndexBuild || !dataTable.isTransactional()) { return true; } @@ -1028,7 +1075,7 @@ public class IndexTool extends Configured implements Tool { indexingTool.setConf(conf); int status = indexingTool.run(args.toArray(new String[0])); Job job = indexingTool.getJob(); - return new AbstractMap.SimpleEntry<Integer, Job>(status, job); + return new AbstractMap.SimpleEntry<>(status, job); } public static void main(final String[] args) throws Exception { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java index 6e97a9d..7e8ee23 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java @@ -164,17 +164,12 @@ public class IndexVerificationOutputRepository implements AutoCloseable { throws IOException { byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey); Put put = new Put(rowKey); - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, - scanMaxTs, tableName); - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES, - scanMaxTs, indexName); - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs))); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, tableName); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES, indexName); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES, Bytes.toBytes(Long.toString(dataRowTs))); - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES, - scanMaxTs, indexRowKey); - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs))); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES, indexRowKey); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES, Bytes.toBytes(Long.toString(indexRowTs))); byte[] errorMessageBytes; if (expectedValue != null) { errorMessageBytes = getErrorMessageBytes(errorMsg, expectedValue, actualValue); @@ -183,11 +178,11 @@ public class IndexVerificationOutputRepository implements AutoCloseable { } else { errorMessageBytes = Bytes.toBytes(errorMsg); } - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, errorMessageBytes); if (isBeforeRebuild) { - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, PHASE_BEFORE_VALUE); } else { - put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, PHASE_AFTER_VALUE); } outputTable.put(put); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java index e52823e..08c431a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java @@ -22,11 +22,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compat.hbase.CompatUtil; import org.apache.phoenix.coprocessor.IndexToolVerificationResult; @@ -44,9 +46,12 @@ import java.sql.SQLException; public class IndexVerificationResultRepository implements AutoCloseable { + public static final String RUN_STATUS_SKIPPED = "Skipped"; + public static final String RUN_STATUS_EXECUTED = "Executed"; private Table resultTable; private Table indexTable; - public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|"); + public static final String ROW_KEY_SEPARATOR = "|"; + public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes(ROW_KEY_SEPARATOR); public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT"; public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME); public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; @@ -57,6 +62,8 @@ public class IndexVerificationResultRepository implements AutoCloseable { public final static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount"; public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT); + private static final String INDEX_TOOL_RUN_STATUS = "IndexToolRunStatus"; + public final static byte[] INDEX_TOOL_RUN_STATUS_BYTES = Bytes.toBytes(INDEX_TOOL_RUN_STATUS); public final static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount"; public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT); @@ -139,7 +146,7 @@ public class IndexVerificationResultRepository implements AutoCloseable { setResultTable(admin.getConnection().getTable(resultTableName)); } } - public static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName, + private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName, byte[] startRow, byte[] stopRow) { byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); int targetOffset = 0; @@ -169,56 +176,59 @@ public class IndexVerificationResultRepository implements AutoCloseable { } public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult, - IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException { + IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException { + logToIndexToolResultTable(verificationResult, verifyType, region, false); + } + + public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult, + IndexTool.IndexVerifyType verifyType, byte[] region, boolean skipped) throws IOException { long scanMaxTs = verificationResult.getScanMaxTs(); byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexTable.getName().toBytes(), region, verificationResult.getStartRow(), verificationResult.getStopRow()); Put put = new Put(rowKey); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, INDEX_TOOL_RUN_STATUS_BYTES, + Bytes.toBytes(skipped ? RUN_STATUS_SKIPPED : RUN_STATUS_EXECUTED)); if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.ONLY) { put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackMissingIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackInvalidIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasExtraCellsCount()))); + Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasExtraCellsCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasMissingCellsCount()))); + Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasMissingCellsCount()))); } if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount()))); + Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackMissingIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackInvalidIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasExtraCellsCount()))); + Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasExtraCellsCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasMissingCellsCount()))); + Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasMissingCellsCount()))); } resultTable.put(put); } @@ -258,6 +268,22 @@ public class IndexVerificationResultRepository implements AutoCloseable { return verificationResult; } + private IndexToolVerificationResult getVerificationResult(Table htable, byte [] oldRowKey, Scan scan ) + throws IOException { + IndexToolVerificationResult verificationResult = null; + Result result = htable.get(new Get(oldRowKey)); + if(result != null) { + byte[][] rowKeyParts = ByteUtil.splitArrayBySeparator(result.getRow(), ROW_KEY_SEPARATOR_BYTE[0]); + verificationResult = new IndexToolVerificationResult(scan); + verificationResult.setStartRow(rowKeyParts[3]); + verificationResult.setStopRow(rowKeyParts[4]); + for (Cell cell : result.rawCells()) { + verificationResult.update(cell); + } + } + return verificationResult; + } + public void close() throws IOException { if (resultTable != null) { resultTable.close(); @@ -274,5 +300,13 @@ public class IndexVerificationResultRepository implements AutoCloseable { public void setIndexTable(Table indexTable) { this.indexTable = indexTable; } + + public IndexToolVerificationResult getVerificationResult(Long ts, Scan scan, Region region, byte[] indexTableName) throws IOException { + byte [] rowKey = generateResultTableRowKey(ts, + indexTableName, region.getRegionInfo().getRegionName(), + scan.getStartRow(), scan.getStopRow()); + return getVerificationResult(resultTable, rowKey, scan); + + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 31bfbac..f575b09 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -160,6 +160,7 @@ public final class PhoenixConfigurationUtil { public static final String MAPREDUCE_TENANT_ID = "phoenix.mapreduce.tenantid"; private static final String INDEX_TOOL_END_TIME = "phoenix.mr.index.endtime"; private static final String INDEX_TOOL_START_TIME = "phoenix.mr.index.starttime"; + private static final String INDEX_TOOL_LAST_VERIFY_TIME = "phoenix.mr.index.last.verify.time"; public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype"; @@ -283,6 +284,12 @@ public final class PhoenixConfigurationUtil { configuration.set(INDEX_TOOL_START_TIME, Long.toString(startTime)); } + public static void setIndexToolLastVerifyTime(Configuration configuration, Long lastVerifyTime) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(lastVerifyTime); + configuration.set(INDEX_TOOL_LAST_VERIFY_TIME, Long.toString(lastVerifyTime)); + } + public static void setCurrentScnValue(Configuration configuration, Long scn) { Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(scn); @@ -298,6 +305,11 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); return configuration.get(CURRENT_SCN_VALUE); } + + public static String getIndexToolLastVerifyTime(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(INDEX_TOOL_LAST_VERIFY_TIME); + } public static List<String> getUpsertColumnNames(final Configuration configuration) { return getValues(configuration, MAPREDUCE_UPSERT_COLUMN_COUNT, MAPREDUCE_UPSERT_COLUMN_VALUE_PREFIX); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java index d82e99a..92317d4 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java @@ -61,7 +61,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_timeRangeNotNull() { + public void testParseOptions_timeRange_timeRangeNotNull() throws Exception { Long startTime = 10L; Long endTime = 15L; String [] args = @@ -75,7 +75,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_null() { + public void testParseOptions_timeRange_null() throws Exception { String [] args = IndexToolIT.getArgValues(true, true, schema, dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE); @@ -86,7 +86,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_startTimeNotNull() { + public void testParseOptions_timeRange_startTimeNotNull() throws Exception { Long startTime = 10L; String [] args = IndexToolIT.getArgValues(true, true, schema, @@ -99,7 +99,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_endTimeNotNull() { + public void testParseOptions_timeRange_endTimeNotNull() throws Exception { Long endTime = 15L; String [] args = IndexToolIT.getArgValues(true, true, schema, @@ -112,7 +112,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_startTimeNullEndTimeInFuture() { + public void testParseOptions_timeRange_startTimeNullEndTimeInFuture() throws Exception { Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000; String [] args = IndexToolIT.getArgValues(true, true, schema, @@ -125,7 +125,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_endTimeNullStartTimeInFuture() { + public void testParseOptions_timeRange_endTimeNullStartTimeInFuture() throws Exception { Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000; String [] args = IndexToolIT.getArgValues(true, true, schema, @@ -138,7 +138,7 @@ public class IndexToolTest extends BaseTest { } @Test(timeout = 10000 /* 10 secs */) - public void testParseOptions_timeRange_startTimeInFuture() { + public void testParseOptions_timeRange_startTimeInFuture() throws Exception { Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000; Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 200000; String [] args = @@ -152,7 +152,7 @@ public class IndexToolTest extends BaseTest { } @Test(timeout = 10000 /* 10 secs */) - public void testParseOptions_timeRange_endTimeInFuture() { + public void testParseOptions_timeRange_endTimeInFuture() throws Exception { Long startTime = EnvironmentEdgeManager.currentTimeMillis(); Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000; String [] args = @@ -166,7 +166,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_startTimeEqEndTime() { + public void testParseOptions_timeRange_startTimeEqEndTime() throws Exception { Long startTime = 10L; Long endTime = 10L; String [] args = @@ -180,7 +180,7 @@ public class IndexToolTest extends BaseTest { } @Test - public void testParseOptions_timeRange_startTimeGtEndTime() { + public void testParseOptions_timeRange_startTimeGtEndTime() throws Exception { Long startTime = 10L; Long endTime = 1L; String [] args = @@ -198,6 +198,31 @@ public class IndexToolTest extends BaseTest { when(pDataTable.isTransactional()).thenReturn(true); exceptionRule.expect(RuntimeException.class); exceptionRule.expectMessage(FEATURE_NOT_APPLICABLE); - IndexTool.checkTimeRangeFeature(1L, 3L, pDataTable, !localIndex); + IndexTool.checkIfFeatureApplicable(1L, 3L, null, pDataTable, !localIndex); + } + + @Test + public void testIncrcementalVerifyOption() throws Exception { + IndexTool mockTool = Mockito.mock(IndexTool.class); + when(mockTool.getLastVerifyTime()).thenCallRealMethod(); + Long lastVerifyTime = 10L; + String [] args = + IndexToolIT.getArgValues(true, true, schema, + dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE, + lastVerifyTime); + when(mockTool.parseOptions(args)).thenCallRealMethod(); + + CommandLine cmdLine = mockTool.parseOptions(args); + + when(mockTool.populateIndexToolAttributes(cmdLine)).thenCallRealMethod(); + when(mockTool.isValidLastVerifyTime(lastVerifyTime)).thenReturn(true); + + mockTool.populateIndexToolAttributes(cmdLine); + Assert.assertEquals(lastVerifyTime, mockTool.getLastVerifyTime()); + + when(pDataTable.isTransactional()).thenReturn(true); + exceptionRule.expect(RuntimeException.class); + exceptionRule.expectMessage(FEATURE_NOT_APPLICABLE); + IndexTool.checkIfFeatureApplicable(null, null, lastVerifyTime, pDataTable, !localIndex); } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java index a87a4e0..0ce54fb 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java @@ -97,7 +97,7 @@ public class IndexUpgradeToolTest { } @Test - public void testIfOptionsArePassedToIndexTool() { + public void testIfOptionsArePassedToIndexTool() throws Exception { if (!upgrade) { return; } @@ -126,7 +126,7 @@ public class IndexUpgradeToolTest { } @Test - public void testMalformedSpacingOptionsArePassedToIndexTool() { + public void testMalformedSpacingOptionsArePassedToIndexTool() throws Exception { if (!upgrade) { return; } @@ -152,7 +152,7 @@ public class IndexUpgradeToolTest { } @Test(expected = IllegalStateException.class) - public void testBadIndexToolOptions() { + public void testBadIndexToolOptions() throws Exception { String [] indexToolOpts = {"-v" + DUMMY_VERIFY_VALUE}; String indexToolarg = String.join(" ", indexToolOpts); String [] args = {"-o", UPGRADE_OP, "-tb", INPUT_LIST, "-rb", "-tool", indexToolarg }; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java new file mode 100644 index 0000000..8cc1970 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.index; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; +import org.apache.phoenix.coprocessor.IndexToolVerificationResult; +import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +public class ShouldVerifyTest { + + @Mock IndexRebuildRegionScanner scanner; + @Mock IndexMaintainer im; + @Mock Scan scan; + @Mock Region region; + @Mock IndexVerificationResultRepository resultRepository; + byte[] indexRowKey; + @Mock IndexToolVerificationResult verificationResult; + + @Before + public void setup() throws IOException { + MockitoAnnotations.initMocks(this); + indexRowKey = null; + when(im.getIndexTableName()).thenReturn(Bytes.toBytes("indexName")); + when(scanner.shouldVerify(any(IndexTool.IndexVerifyType.class), Matchers.<byte[]>any(), any(Scan.class), + any(Region.class), any(IndexMaintainer.class), + any(IndexVerificationResultRepository.class))).thenCallRealMethod(); + } + + @Test + public void testShouldVerify_repair_true() throws IOException { + indexRowKey = new byte[5]; + Assert.assertTrue(scanner.shouldVerify(IndexTool.IndexVerifyType.ONLY, indexRowKey, scan, region, im, resultRepository)); + } + + @Test + public void testShouldVerify_repair_rebuild_true() throws IOException { + indexRowKey = new byte[5]; + when(scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY)).thenReturn(Bytes.toBytes(1L)); + assertShouldVerify(true); + } + + private void assertShouldVerify(boolean assertion) throws IOException { + Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.NONE, indexRowKey, scan, region, im, resultRepository)); + Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.BEFORE, indexRowKey, scan, region, im, resultRepository)); + Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.AFTER, indexRowKey, scan, region, im, resultRepository)); + } + + @Test + public void testShouldVerify_false() throws IOException { + when(scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY)).thenReturn(Bytes.toBytes(1L)); + when(resultRepository.getVerificationResult(1L, scan, region, im.getIndexTableName())).thenReturn(verificationResult); + assertShouldVerify(false); + } + + @Test + public void testShouldVerify_rebuild_true() throws IOException { + when(scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY)).thenReturn(Bytes.toBytes(1L)); + when(resultRepository.getVerificationResult(1L, scan, region, im.getIndexTableName())).thenReturn(null); + assertShouldVerify(true); + } + + @Test + public void testShouldVerify_noTime_true() throws IOException { + when(resultRepository.getVerificationResult(1L, scan, region, im.getIndexTableName())).thenReturn(verificationResult); + assertShouldVerify(true); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java index f58605f..840e9d5 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java @@ -301,4 +301,15 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { Long.parseLong(PhoenixConfigurationUtil.getCurrentScnValue(configuration))); } + + @Test + public void testLastVerifyTimeConfig() { + final Configuration configuration = new Configuration(); + Long lastVerifyTime = 2L; + + PhoenixConfigurationUtil.setIndexToolLastVerifyTime(configuration, lastVerifyTime); + Assert.assertEquals(lastVerifyTime.longValue(), + Long.parseLong(PhoenixConfigurationUtil.getIndexToolLastVerifyTime(configuration))); + + } }