This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 339a22d PHOENIX-5735 IndexTool's inline verification should not verify rows beyond max lookback age 339a22d is described below commit 339a22d29b3c5f268935c8c013aaa1e1f4bbbf69 Author: Weiming Wang <wangweiming...@live.cn> AuthorDate: Wed Apr 1 11:09:09 2020 -0700 PHOENIX-5735 IndexTool's inline verification should not verify rows beyond max lookback age Signed-off-by: Kadir <kozde...@salesforce.com> --- .../org/apache/phoenix/end2end/IndexToolIT.java | 51 ++++-- .../hadoop/hbase/regionserver/ScanInfoUtil.java | 2 +- .../coprocessor/IndexRebuildRegionScanner.java | 173 +++++++++++++++------ .../coprocessor/IndexToolVerificationResult.java | 78 ++++++++-- .../apache/phoenix/mapreduce/index/IndexTool.java | 8 + .../index/PhoenixIndexImportDirectReducer.java | 8 + .../index/PhoenixIndexToolJobCounters.java | 6 +- .../phoenix/index/VerifySingleIndexRowTest.java | 168 +++++++++++++++----- 8 files changed, 376 insertions(+), 118 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 119c806..71cb530 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -62,6 +62,7 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -81,19 +82,21 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; @@ -251,6 +254,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); dropIndexToolTables(conn); } } @@ -356,6 +361,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); } finally { conn.close(); } @@ -412,7 +419,9 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); @@ -429,10 +438,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(2 * NROWS, actualRowCount); dropIndexToolTables(conn); @@ -593,9 +606,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { null, -1, IndexTool.IndexVerifyType.AFTER); // The index tool output table should report that there is a missing index row Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName); - byte[] expectedValueBytes = Bytes.toBytes("Missing index row"); - assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - expectedValueBytes, 0, expectedValueBytes.length) == 0); + try { + String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; + String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertTrue(expectedErrorMsg.equals(actualErrorMsg)); + } catch(Exception ex){ + Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); + } IndexRegionObserver.setIgnoreIndexRebuildForTesting(false); dropIndexToolTables(conn); } @@ -627,9 +644,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, -1, IndexTool.IndexVerifyType.ONLY); Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName); - byte[] expectedValueBytes = Bytes.toBytes("Missing index row"); - assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - expectedValueBytes, 0, expectedValueBytes.length) == 0); + try { + String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; + String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertTrue(expectedErrorMsg.equals(actualErrorMsg)); + } catch(Exception ex) { + Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); + } // Delete the output table for the next test dropIndexToolTables(conn); // Run the index tool to populate the index while verifying rows @@ -668,9 +689,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName); - byte[] expectedValueBytes = Bytes.toBytes("Missing index row"); - assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), - cell.getValueLength(), expectedValueBytes, 0, expectedValueBytes.length) == 0); + try { + String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; + String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertTrue(expectedErrorMsg.equals(actualErrorMsg)); + } catch(Exception ex) { + Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); + } // Run the index tool to populate the index while verifying rows runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java index e70ffc7..7d54228 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java @@ -121,7 +121,7 @@ public class ScanInfoUtil { DEFAULT_PHOENIX_MAX_LOOKBACK_AGE)); } - private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){ + public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){ return maxLookbackTime > 0L; } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 0bd8c7c..2bdaf1d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -17,21 +17,10 @@ */ package org.apache.phoenix.coprocessor; -import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn; import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY; -import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.*; import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; @@ -48,7 +37,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.TreeMap; import java.util.concurrent.ExecutionException; import com.google.common.annotations.VisibleForTesting; @@ -72,8 +60,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.compile.ScanRanges; @@ -109,6 +98,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Maps; public class IndexRebuildRegionScanner extends BaseRegionScanner { + public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack"; private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class); public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max"; @@ -154,6 +144,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private int singleRowRebuildReturnCode; private Map<byte[], NavigableSet<byte[]>> familyMap; private byte[][] viewConstants; + private long maxLookBackInMills; @VisibleForTesting public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, @@ -219,6 +210,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK); } } + + maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config); } private void setReturnCodeForSingleRowRebuild() throws IOException { @@ -302,6 +295,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackMissingIndexRowCount))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackInvalidIndexRowCount))); } if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, @@ -312,6 +309,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackMissingIndexRowCount))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackInvalidIndexRowCount))); } resultHTable.put(put); } @@ -370,6 +371,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return 0; } + @VisibleForTesting + public long setMaxLookBackInMills(long maxLookBackInMills) { + this.maxLookBackInMills = maxLookBackInMills; + return 0; + } + public static class SimpleValueGetter implements ValueGetter { final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); final Put put; @@ -516,13 +523,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return null; } - private boolean isMatchingMutation(Mutation expected, Mutation actual, int iteration) throws IOException { + private void logMismatch(Mutation expected, Mutation actual, int iteration) throws IOException { if (getTimestamp(expected) != getTimestamp(actual)) { String errorMsg = "Not matching timestamp"; byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg, null, null); - return false; + return; } int expectedCellCount = 0; for (List<Cell> cells : expected.getFamilyCellMap().values()) { @@ -539,14 +546,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg); - return false; + return; } if (!CellUtil.matchingValue(actualCell, expectedCell)) { String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell)); - return false; + return; } } } @@ -562,6 +569,40 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg); + } + } + + private boolean isMatchingMutation(Mutation expected, Mutation actual) throws IOException { + if (getTimestamp(expected) != getTimestamp(actual)) { + return false; + } + int expectedCellCount = 0; + for (List<Cell> cells : expected.getFamilyCellMap().values()) { + if (cells == null) { + continue; + } + for (Cell expectedCell : cells) { + expectedCellCount++; + byte[] family = CellUtil.cloneFamily(expectedCell); + byte[] qualifier = CellUtil.cloneQualifier(expectedCell); + Cell actualCell = getCell(actual, family, qualifier); + if (actualCell == null || + !CellUtil.matchingType(expectedCell, actualCell)) { + return false; + } + if (!CellUtil.matchingValue(actualCell, expectedCell)) { + return false; + } + } + } + int actualCellCount = 0; + for (List<Cell> cells : actual.getFamilyCellMap().values()) { + if (cells == null) { + continue; + } + actualCellCount += cells.size(); + } + if (expectedCellCount != actualCellCount) { return false; } return true; @@ -698,7 +739,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { while (iterator.hasNext()) { Mutation mutation = iterator.next(); if ((mutation instanceof Put && !isVerified((Put) mutation)) || - (mutation instanceof Delete && isDeleteFamilyVersion(mutation))) { + (mutation instanceof Delete && !isDeleteFamily(mutation))) { iterator.remove(); } else { if (previous != null && getTimestamp(previous) == getTimestamp(mutation) && @@ -796,15 +837,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { repairActualMutationList(actualMutationList, expectedMutationList); } cleanUpActualMutationList(actualMutationList); - long currentTime = EnvironmentEdgeManager.currentTime(); + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); int actualIndex = 0; int expectedIndex = 0; - int matchingCount = 0; int expectedSize = expectedMutationList.size(); int actualSize = actualMutationList.size(); Mutation expected = null; Mutation previousExpected; - Mutation actual; + Mutation actual = null; while (expectedIndex < expectedSize && actualIndex <actualSize) { previousExpected = expected; expected = expectedMutationList.get(expectedIndex); @@ -822,7 +862,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (previousExpected instanceof Delete) { // Between an expected delete and put, there can be one or more deletes due to // concurrent mutations or data table write failures. Skip all of them if any - while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) { + // There cannot be any actual delete mutation between two expected put mutations. + while (getTimestamp(actual) >= getTimestamp(expected) && actual instanceof Delete) { actualIndex++; if (actualIndex == actualSize) { break; @@ -836,10 +877,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (actual instanceof Delete) { break; } - if (isMatchingMutation(expected, actual, expectedIndex)) { + if (isMatchingMutation(expected, actual)) { expectedIndex++; actualIndex++; - matchingCount++; continue; } } else { // expected instanceof Delete @@ -859,39 +899,54 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { (actual instanceof Delete && isDeleteFamily(actual))) { expectedIndex++; actualIndex++; - matchingCount++; continue; } } - if (matchingCount > 0) { - break; + break; + } + + if (expectedIndex == expectedSize ){ + // every expected mutation has its matching one in the actual list. + verificationPhaseResult.validIndexRowCount++; + return true; + } + + if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){ + if (expectedIndex > 0) { + // if current expected index mutation is beyond max look back window, we only need to make sure its latest + // mutation is a matching one, as an SCN query is required. + verificationPhaseResult.validIndexRowCount++; + return true; } - verificationPhaseResult.invalidIndexRowCount++; + + // All expected mutations are beyond the maxLookBack window, none of them can find its matching one in actual list + // It may be caused by real bug or compaction on the index table. + // We report it as a failure, so "before" option can trigger the index rebuild for this row. + // This repair is required, when there is only one index row for a given data table row and the timestamp of that row + // can be beyond maxLookBack. + verificationPhaseResult.beyondMaxLookBackInvalidIndexRowCount++; + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); + String errorMsg = String.format("Expect %1$s mutations but got %2$s (beyond maxLookBack)", + expectedSize, + actualSize); + logToIndexToolOutputTable(dataKey, indexRow.getRow(), + getTimestamp(expectedMutationList.get(expectedIndex)), + 0, errorMsg); return false; } - if ((expectedIndex != expectedSize) || actualIndex != actualSize) { - if (matchingCount > 0) { - if (verifyType != IndexTool.IndexVerifyType.ONLY) { - // We do not consider this as a verification issue but log it for further information. - // This may happen due to compaction - byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); - String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got " - + actualMutationList.size(); - logToIndexToolOutputTable(dataKey, indexRow.getRow(), - getTimestamp(expectedMutationList.get(0)), - getTimestamp(actualMutationList.get(0)), errorMsg); - } - } else { + else { + if (actualIndex < actualSize && actual instanceof Put && expected instanceof Put){ + logMismatch(expected, actual, expectedIndex); + } + else { byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); String errorMsg = "Not matching index row"; logToIndexToolOutputTable(dataKey, indexRow.getRow(), getTimestamp(expectedMutationList.get(0)), 0L, errorMsg); - verificationPhaseResult.invalidIndexRowCount++; - return false; } + verificationPhaseResult.invalidIndexRowCount++; + return false; } - verificationPhaseResult.validIndexRowCount++; - return true; } private static long getMaxTimestamp(Pair<Put, Delete> pair) { @@ -937,7 +992,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { // TODO: metrics for expired rows if (!keys.isEmpty()) { Iterator<KeyRange> itr = keys.iterator(); - long currentTime = EnvironmentEdgeManager.currentTime(); + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); while(itr.hasNext()) { KeyRange keyRange = itr.next(); byte[] key = keyRange.getLowerRange(); @@ -950,20 +1005,30 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } if (keys.size() > 0) { for (KeyRange keyRange : keys) { - String errorMsg = "Missing index row"; byte[] key = keyRange.getLowerRange(); List<Mutation> mutationList = indexKeyToMutationMap.get(key); - if (mutationList.get(mutationList.size() - 1) instanceof Delete) { + Mutation mutation = mutationList.get(mutationList.size() - 1); + if (mutation instanceof Delete) { continue; } + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + String errorMsg; + if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){ + errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; + verificationPhaseResult.beyondMaxLookBackMissingIndexRowCount++; + } + else { + errorMsg = "Missing index row"; + verificationPhaseResult.missingIndexRowCount++; + } byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants); logToIndexToolOutputTable(dataKey, keyRange.getLowerRange(), getMaxTimestamp(dataKeyToMutationMap.get(dataKey)), - getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg); - verificationPhaseResult.missingIndexRowCount++; + getTimestamp(mutation), errorMsg); } } + keys.addAll(invalidKeys); } @@ -974,6 +1039,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return tsToCheck < (currentTime - (long) indexTableTTL * 1000); } + private boolean isTimestampBeyondMaxLookBack(long currentTime, long tsToCheck){ + if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills)) + return true; + return tsToCheck < (currentTime - maxLookBackInMills); + } + private void addVerifyTask(final List<KeyRange> keys, final IndexToolVerificationResult.PhaseResult verificationPhaseResult) { tasks.add(new Task<Boolean>() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java index ed92fad..1fbb866 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java @@ -30,17 +30,7 @@ import org.apache.phoenix.mapreduce.index.IndexTool; import java.io.IOException; import java.util.Arrays; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY; -import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.*; public class IndexToolVerificationResult { public static class PhaseResult { @@ -48,26 +38,33 @@ public class IndexToolVerificationResult { long expiredIndexRowCount = 0; long missingIndexRowCount = 0; long invalidIndexRowCount = 0; + long beyondMaxLookBackMissingIndexRowCount = 0; + long beyondMaxLookBackInvalidIndexRowCount = 0; public void add(PhaseResult phaseResult) { validIndexRowCount += phaseResult.validIndexRowCount; expiredIndexRowCount += phaseResult.expiredIndexRowCount; missingIndexRowCount += phaseResult.missingIndexRowCount; invalidIndexRowCount += phaseResult.invalidIndexRowCount; + beyondMaxLookBackMissingIndexRowCount += phaseResult.beyondMaxLookBackMissingIndexRowCount; + beyondMaxLookBackInvalidIndexRowCount += phaseResult.beyondMaxLookBackInvalidIndexRowCount; } public PhaseResult(){} public PhaseResult(long validIndexRowCount, long expiredIndexRowCount, - long missingIndexRowCount, long invalidIndexRowCount) { + long missingIndexRowCount, long invalidIndexRowCount, + long beyondMaxLookBackMissingIndexRowCount, long beyondMaxLookBackInvalidIndexRowCount) { this.validIndexRowCount = validIndexRowCount; this.expiredIndexRowCount = expiredIndexRowCount; this.missingIndexRowCount = missingIndexRowCount; this.invalidIndexRowCount = invalidIndexRowCount; + this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount; + this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount; } public long getTotalCount() { - return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount; + return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount + beyondMaxLookBackMissingIndexRowCount + beyondMaxLookBackInvalidIndexRowCount; } @Override @@ -77,7 +74,8 @@ public class IndexToolVerificationResult { ", expiredIndexRowCount=" + expiredIndexRowCount + ", missingIndexRowCount=" + missingIndexRowCount + ", invalidIndexRowCount=" + invalidIndexRowCount + - '}'; + ", beyondMaxLookBackMissingIndexRowCount=" + beyondMaxLookBackMissingIndexRowCount + + ", beyondMaxLookBackInvalidIndexRowCount=" + beyondMaxLookBackInvalidIndexRowCount; } @Override @@ -92,7 +90,9 @@ public class IndexToolVerificationResult { return this.expiredIndexRowCount == pr.expiredIndexRowCount && this.validIndexRowCount == pr.validIndexRowCount && this.invalidIndexRowCount == pr.invalidIndexRowCount - && this.missingIndexRowCount == pr.missingIndexRowCount; + && this.missingIndexRowCount == pr.missingIndexRowCount + && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount + && this.beyondMaxLookBackMissingIndexRowCount == pr.beyondMaxLookBackMissingIndexRowCount; } @Override @@ -102,6 +102,8 @@ public class IndexToolVerificationResult { result = 31 * result + validIndexRowCount; result = 31 * result + missingIndexRowCount; result = 31 * result + invalidIndexRowCount; + result = 31 * result + beyondMaxLookBackMissingIndexRowCount; + result = 31 * result + beyondMaxLookBackInvalidIndexRowCount; return (int)result; } } @@ -141,6 +143,14 @@ public class IndexToolVerificationResult { return before.invalidIndexRowCount; } + public long getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount() { + return before.beyondMaxLookBackMissingIndexRowCount; + }; + + public long getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount() { + return before.beyondMaxLookBackInvalidIndexRowCount; + }; + public long getBeforeRebuildMissingIndexRowCount() { return before.missingIndexRowCount; } @@ -161,6 +171,14 @@ public class IndexToolVerificationResult { return after.missingIndexRowCount; } + public long getAfterRebuildBeyondMaxLookBackMissingIndexRowCount() { + return after.beyondMaxLookBackMissingIndexRowCount; + }; + + public long getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount() { + return after.beyondMaxLookBackInvalidIndexRowCount; + }; + private void addScannedDataRowCount(long count) { this.scannedDataRowCount += count; } @@ -185,6 +203,14 @@ public class IndexToolVerificationResult { before.invalidIndexRowCount += count; } + private void addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(long count) { + before.beyondMaxLookBackMissingIndexRowCount += count; + } + + private void addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) { + before.beyondMaxLookBackInvalidIndexRowCount += count; + } + private void addAfterRebuildValidIndexRowCount(long count) { after.validIndexRowCount += count; } @@ -201,6 +227,14 @@ public class IndexToolVerificationResult { after.invalidIndexRowCount += count; } + private void addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(long count) { + after.beyondMaxLookBackMissingIndexRowCount += count; + } + + private void addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) { + after.beyondMaxLookBackInvalidIndexRowCount += count; + } + private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) { if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0, @@ -229,6 +263,10 @@ public class IndexToolVerificationResult { addBeforeRebuildMissingIndexRowCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) { addBeforeRebuildInvalidIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES)) { + addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES)) { + addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) { addAfterRebuildValidIndexRowCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) { @@ -237,6 +275,10 @@ public class IndexToolVerificationResult { addAfterRebuildMissingIndexRowCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) { addAfterRebuildInvalidIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES)) { + addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES)) { + addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(getValue(cell)); } } @@ -284,11 +326,13 @@ public class IndexToolVerificationResult { if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) { return false; } else if (verifyType == IndexTool.IndexVerifyType.ONLY) { - if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) { + if (before.invalidIndexRowCount + before.missingIndexRowCount + + before.beyondMaxLookBackInvalidIndexRowCount + before.beyondMaxLookBackMissingIndexRowCount > 0) { return true; } } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { - if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) { + if (after.invalidIndexRowCount + after.missingIndexRowCount + + after.beyondMaxLookBackInvalidIndexRowCount + after.beyondMaxLookBackMissingIndexRowCount > 0) { return true; } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 0347811..95703a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -187,6 +187,10 @@ public class IndexTool extends Configured implements Tool { public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT); public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount"; public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT); + public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount"; + public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT); + public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount"; + public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT); public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount"; public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT); public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount"; @@ -195,6 +199,10 @@ public class IndexTool extends Configured implements Tool { public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT); public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount"; public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT); + public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackMissingIndexRowCount"; + public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT); + public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount"; + public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT); public static String VERIFICATION_PHASE = "Phase"; public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java index 8d1b4db..a24e3ab 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java @@ -80,6 +80,10 @@ public class PhoenixIndexImportDirectReducer extends setValue(verificationResult.getBeforeRebuildMissingIndexRowCount()); context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT). setValue(verificationResult.getBeforeRebuildInvalidIndexRowCount()); + context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT). + setValue(verificationResult.getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount()); + context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT). + setValue(verificationResult.getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount()); } if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT). @@ -90,6 +94,10 @@ public class PhoenixIndexImportDirectReducer extends setValue(verificationResult.getAfterRebuildMissingIndexRowCount()); context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT). setValue(verificationResult.getAfterRebuildInvalidIndexRowCount()); + context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT). + setValue(verificationResult.getAfterRebuildBeyondMaxLookBackMissingIndexRowCount()); + context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT). + setValue(verificationResult.getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount()); } if (verificationResult.isVerificationFailed(verifyType)) { throw new IOException("Index verification failed! " + verificationResult); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java index c10694d..b736787 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java @@ -28,8 +28,12 @@ public enum PhoenixIndexToolJobCounters { BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT, + BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT, + BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT, AFTER_REBUILD_VALID_INDEX_ROW_COUNT, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT, - AFTER_REBUILD_INVALID_INDEX_ROW_COUNT; + AFTER_REBUILD_INVALID_INDEX_ROW_COUNT, + AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT, + AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java index 30961e3..5df041d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java @@ -32,16 +32,14 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.coprocessor.IndexToolVerificationResult; +import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.util.EnvironmentEdge; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.*; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -55,17 +53,13 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Properties; +import java.util.*; import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; @@ -265,9 +259,12 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod(); when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod(); when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod(); + when(rebuildScanner.setMaxLookBackInMills(Matchers.anyLong())).thenCallRealMethod(); rebuildScanner.setIndexTableTTL(HConstants.FOREVER); indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn); rebuildScanner.setIndexMaintainer(indexMaintainer); + // set the maxLookBack to infinite to avoid the compaction + rebuildScanner.setMaxLookBackInMills(Long.MAX_VALUE); } private void initializeGlobalMockitoSetup() throws IOException { @@ -354,7 +351,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { @Test public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException { IndexToolVerificationResult.PhaseResult - expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0); + expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0, 0, 0); for (Map.Entry<byte[], List<Mutation>> entry : indexKeyToMutationMapLocal.entrySet()) { initializeLocalMockitoSetup(entry, TestType.EXPIRED); @@ -365,7 +362,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { assertTrue(actualPR.equals(expectedPR)); } } - @Ignore + @Test public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException { IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult(); @@ -379,7 +376,6 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { } } - @Ignore @Test public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException { IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult(); @@ -406,7 +402,6 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { } } - @Ignore @Test public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException { IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult(); @@ -428,33 +423,136 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { rebuildScanner.verifySingleIndexRow(indexRow, actualPR); } + + // Test the major compaction on index table only. + // There is at least one expected mutation within maxLookBack that has its matching one in the actual list. + // However there are some expected mutations outside of maxLookBack, which matching ones in actual list may be compacted away. + // We will report such row as a valid row. @Test - public void testVerifySingleIndexRow_actualMutations_null() throws IOException { - byte [] validRowKey = getValidRowKey(); - when(indexRow.getRow()).thenReturn(validRowKey); - when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null); - exceptionRule.expect(DoNotRetryIOException.class); - exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY); - rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + public void testVerifySingleIndexRow_compactionOnIndexTable_atLeastOneExpectedMutationWithinMaxLookBack() throws Exception { + String dataRowKey = "k1"; + byte[] indexRowKey1Bytes = generateIndexRowKey(dataRowKey, "val1"); + ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(1); + EnvironmentEdgeManager.injectEdge(injectEdge); + + List<Mutation> expectedMutations = new ArrayList<>(); + List<Mutation> actualMutations = new ArrayList<>(); + // change the maxLookBack from infinite to some interval, which allows to simulate the mutation beyond the maxLookBack window. + long maxLookbackInMills = 10 * 1000; + rebuildScanner.setMaxLookBackInMills(maxLookbackInMills); + + Put put = new Put(indexRowKey1Bytes); + Cell cell = CellUtil.createCell(indexRowKey1Bytes, + QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.EMPTY_COLUMN_BYTES, + EnvironmentEdgeManager.currentTimeMillis(), + KeyValue.Type.Put.getCode(), + IndexRegionObserver.VERIFIED_BYTES); + put.add(cell); + // This mutation is beyond maxLookBack, so add it to expectedMutations only. + expectedMutations.add(put); + + // advance the time of maxLookBack, so last mutation will be outside of maxLookBack, + // next mutation will be within maxLookBack + injectEdge.incrementValue(maxLookbackInMills); + put = new Put(indexRowKey1Bytes); + cell = CellUtil.createCell(indexRowKey1Bytes, + QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.EMPTY_COLUMN_BYTES, + EnvironmentEdgeManager.currentTimeMillis(), + KeyValue.Type.Put.getCode(), + IndexRegionObserver.VERIFIED_BYTES); + put.add(cell); + // This mutation is in both expectedMutations and actualMutations, as it is within the maxLookBack, so it will not get chance to be compacted away + expectedMutations.add(put); + actualMutations.add(put); + Result actualMutationsScanResult = Result.create(Arrays.asList(cell)); + + Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR)); + indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations); + rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap); + when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations); + + injectEdge.incrementValue(1); + IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult(); + // Report this validation as a success + assertTrue(rebuildScanner.verifySingleIndexRow(actualMutationsScanResult, actualPR)); + // validIndexRowCount = 1 + IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0); + assertTrue(actualPR.equals(expectedPR)); } + // Test the major compaction on index table only. + // All expected mutations are beyond the maxLookBack, and there are no matching ones in the actual list because of major compaction. + // We will report such row as an invalid beyond maxLookBack row. @Test - public void testVerifySingleIndexRow_actualMutations_empty() throws IOException { - byte [] validRowKey = getValidRowKey(); - when(indexRow.getRow()).thenReturn(validRowKey); - actualMutationList = new ArrayList<>(); - when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList); - exceptionRule.expect(DoNotRetryIOException.class); - exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY); - rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + public void testVerifySingleIndexRow_compactionOnIndexTable_noExpectedMutationWithinMaxLookBack() throws Exception { + String dataRowKey = "k1"; + byte[] indexRowKey1Bytes = generateIndexRowKey(dataRowKey, "val1"); + List<Mutation> expectedMutations = new ArrayList<>(); + List<Mutation> actualMutations = new ArrayList<>(); + // change the maxLookBack from infinite to some interval, which allows to simulate the mutation beyond the maxLookBack window. + long maxLookbackInMills = 10 * 1000; + rebuildScanner.setMaxLookBackInMills(maxLookbackInMills); + + ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(1); + EnvironmentEdgeManager.injectEdge(injectEdge); + + Put put = new Put(indexRowKey1Bytes); + Cell cell = CellUtil.createCell(indexRowKey1Bytes, + QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.EMPTY_COLUMN_BYTES, + EnvironmentEdgeManager.currentTimeMillis(), + KeyValue.Type.Put.getCode(), + VERIFIED_BYTES); + put.add(cell); + // This mutation is beyond maxLookBack, so add it to expectedMutations only. + expectedMutations.add(put); + + injectEdge.incrementValue(maxLookbackInMills); + put = new Put(indexRowKey1Bytes); + cell = CellUtil.createCell(indexRowKey1Bytes, + QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.EMPTY_COLUMN_BYTES, + EnvironmentEdgeManager.currentTimeMillis(), + KeyValue.Type.Put.getCode(), + UNVERIFIED_BYTES); + put.add(cell); + // This mutation is actualMutations only, as it is an unverified put + actualMutations.add(put); + Result actualMutationsScanResult = Result.create(Arrays.asList(cell)); + + Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR)); + indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations); + rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap); + when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations); + + injectEdge.incrementValue(1); + IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult(); + // Report this validation as a failure + assertFalse(rebuildScanner.verifySingleIndexRow(actualMutationsScanResult, actualPR)); + // beyondMaxLookBackInvalidIndexRowCount = 1 + IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1); + assertTrue(actualPR.equals(expectedPR)); + } + + private static byte[] generateIndexRowKey(String dataRowKey, String dataVal){ + List<Byte> idxKey = new ArrayList<>(); + if (dataVal != null && !dataVal.isEmpty()) + idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(dataVal))); + idxKey.add(QueryConstants.SEPARATOR_BYTE); + idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(dataRowKey))); + return com.google.common.primitives.Bytes.toArray(idxKey); } private IndexToolVerificationResult.PhaseResult getValidPhaseResult() { - return new IndexToolVerificationResult.PhaseResult(1,0,0,0); + return new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0); } private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() { - return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1); + return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1, 0, 0); } private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry, @@ -537,7 +635,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { newCell = CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c), Bytes.toBytes(UNEXPECTED_COLUMN), - EnvironmentEdgeManager.currentTimeMillis(), + c.getTimestamp(), KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv")); newCellList.add(newCell); newCellList.add(c); @@ -575,7 +673,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { private Cell getVerifiedEmptyCell(Cell c) { return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c), indexMaintainer.getEmptyKeyValueQualifier(), - EnvironmentEdgeManager.currentTimeMillis(), + c.getTimestamp(), KeyValue.Type.Put.getCode(), VERIFIED_BYTES); }