This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch PHOENIX-5951 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 3435177fd1cd99fd4fb69ad29646d276cd157e6c Author: Geoffrey Jacoby <gjac...@apache.org> AuthorDate: Thu Jun 11 13:23:08 2020 -0700 PHOENIX-5951 - Index rebuild output logging for past-max-lookback rows should be configurable --- .../index/IndexVerificationOutputRepositoryIT.java | 65 +++++++++++++++++----- .../coprocessor/IndexRebuildRegionScanner.java | 51 +++++++++++++---- .../index/IndexVerificationOutputRepository.java | 52 ++++++++++++++++- .../index/IndexVerificationOutputRow.java | 21 ++++++- .../phoenix/index/VerifySingleIndexRowTest.java | 8 ++- 5 files changed, 164 insertions(+), 33 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java index f24f49d..5731c6a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java @@ -31,6 +31,7 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType; import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -53,6 +54,9 @@ import java.util.List; import java.util.Map; import static org.apache.phoenix.coprocessor.MetaDataProtocol.DEFAULT_LOG_TTL; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE; @@ -90,21 +94,23 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis(); outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, expectedErrorMessage, expectedValue, actualValue, - scanMaxTs, tableNameBytes, true); + scanMaxTs, tableNameBytes, true, + INVALID_ROW); //now increment the scan time by 1 and do it again outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, expectedErrorMessage, expectedValue, actualValue, - scanMaxTs +1, tableNameBytes, false); + scanMaxTs +1, tableNameBytes, false, + INVALID_ROW); //make sure we only get the first row back IndexVerificationOutputRow expectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs, indexRowTs, expectedErrorMessage, expectedValue, actualValue, - scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE); + scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE, INVALID_ROW); verifyOutputRow(outputRepository, scanMaxTs, indexNameBytes, expectedRow); //make sure we get the second row back IndexVerificationOutputRow secondExpectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs, indexRowTs, expectedErrorMessage, expectedValue, actualValue, - scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE); + scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE, INVALID_ROW); verifyOutputRow(outputRepository, scanMaxTs+1, indexNameBytes, secondExpectedRow); } @@ -128,7 +134,8 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT TestUtil.assertTableHasTtl(conn, TableName.valueOf(OUTPUT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL); outputRepository.logToIndexToolOutputTable(mockStringBytes, mockStringBytes, 1, 2, mockString, mockStringBytes, mockStringBytes, - EnvironmentEdgeManager.currentTimeMillis(), mockStringBytes, true); + EnvironmentEdgeManager.currentTimeMillis(), mockStringBytes, true, + INVALID_ROW); Assert.assertEquals(1, TestUtil.getRowCount(hTable, false)); @@ -147,7 +154,7 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.BEFORE; boolean expectedBefore = false; boolean expectedAfter = true; - verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter); + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW); } @Test @@ -155,7 +162,7 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.AFTER; boolean expectedBefore = true; boolean expectedAfter = false; - verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter); + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW); } @Test @@ -163,7 +170,7 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.BOTH; boolean expectedBefore = false; boolean expectedAfter = false; - verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter); + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW); } @Test @@ -171,16 +178,46 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE; boolean expectedBefore = true; boolean expectedAfter = true; - verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter); + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW); } - public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType, boolean expectedBefore, boolean expectedAfter) throws SQLException, IOException { + @Test + public void testDisableLoggingBeyondMaxLookback() throws SQLException, IOException { + IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE; + boolean expectedBefore = false; + boolean expectedAfter = false; + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, + BEYOND_MAX_LOOKBACK_INVALID, false); + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, + BEYOND_MAX_LOOKBACK_MISSING, false); + + expectedBefore = true; + expectedAfter = true; + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, + BEYOND_MAX_LOOKBACK_INVALID, true); + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, + BEYOND_MAX_LOOKBACK_MISSING, true); + } + + public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType, + boolean expectedBefore, boolean expectedAfter, + IndexVerificationErrorType errorType) throws SQLException, IOException { + verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, errorType, + true); + } + + public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType, + boolean expectedBefore, boolean expectedAfter, + IndexVerificationErrorType errorType, + boolean shouldLogBeyondMaxLookback) throws SQLException, + IOException { Table mockOutputTable = Mockito.mock(Table.class); Table mockIndexTable = Mockito.mock(Table.class); when(mockIndexTable.getName()).thenReturn(TableName.valueOf("testDisableLoggingIndexName")); IndexVerificationOutputRepository outputRepository = new IndexVerificationOutputRepository(mockOutputTable, mockIndexTable, disableLoggingVerifyType); + outputRepository.setShouldLogBeyondMaxLookback(shouldLogBeyondMaxLookback); byte[] dataRowKey = Bytes.toBytes("dataRowKey"); byte[] indexRowKey = Bytes.toBytes("indexRowKey"); long dataRowTs = EnvironmentEdgeManager.currentTimeMillis(); @@ -192,9 +229,9 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT byte[] tableName = Bytes.toBytes("testDisableLoggingTableName"); outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs - , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, true); + , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, true, errorType); outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs - , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, false); + , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, false, errorType); int expectedRowsLogged = 0; if (expectedBefore && expectedAfter) { expectedRowsLogged = 2; @@ -222,7 +259,8 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT long scanMaxTs, byte[] tableNameBytes, byte[] indexNameBytes, - byte[] phaseBeforeValue) { + byte[] phaseBeforeValue, + IndexVerificationErrorType errorType) { IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder = new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder(); return builder.setDataTableRowKey(dataRowKey). @@ -239,6 +277,7 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT setDataTableName(Bytes.toString(tableNameBytes)). setIndexTableName(Bytes.toString(indexNameBytes)). setPhaseValue(phaseBeforeValue). + setErrorType(errorType). build(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 6f82db3..6f39837 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -20,6 +20,12 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn; +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_CELLS; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.MISSING_ROW; import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; @@ -40,6 +46,7 @@ import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -47,6 +54,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -92,6 +100,12 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class); + public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = + "phoenix.index.mr.log.beyond.max.lookback.errors"; + public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false; + private boolean useProto = true; + private byte[] indexRowKey; + private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE; private static boolean ignoreIndexRebuildForTesting = false; public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; } private byte[] indexRowKeyforReadRepair; @@ -123,6 +137,9 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } } if (verify) { + boolean shouldLogBeyondMaxLookbackInvalidRows = + env.getConfiguration().getBoolean(PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS, + DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); byte[] disableLoggingValueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE); @@ -133,6 +150,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { verificationOutputRepository = new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName() , hTableFactory, disableLoggingVerifyType); + verificationOutputRepository.setShouldLogBeyondMaxLookback(shouldLogBeyondMaxLookbackInvalidRows); verificationResult = new IndexToolVerificationResult(scan); verificationResultRepository = new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); @@ -257,17 +275,19 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, - String errorMsg, boolean isBeforeRebuild) throws IOException { - logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null, isBeforeRebuild); + String errorMsg, boolean isBeforeRebuild, + IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException { + logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, + null, isBeforeRebuild, errorType); } @VisibleForTesting public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, - String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild) - throws IOException { + String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild, + IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException { verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(), - region.getRegionInfo().getTable().getName(), isBeforeRebuild); + region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType); } private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) { @@ -288,7 +308,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { String errorMsg = "Not matching timestamp"; byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), - errorMsg, null, null, isBeforeRebuild); + errorMsg, null, null, isBeforeRebuild, INVALID_ROW); return; } int expectedCellCount = 0; @@ -305,7 +325,8 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { !CellUtil.matchingType(expectedCell, actualCell)) { byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); - logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg,isBeforeRebuild); + logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), + getTimestamp(actual), errorMsg, isBeforeRebuild, INVALID_ROW); verificationPhaseResult.setIndexHasMissingCellsCount(verificationPhaseResult.getIndexHasMissingCellsCount() + 1); return; } @@ -313,7 +334,8 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), - errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell), isBeforeRebuild); + errorMsg, CellUtil.cloneValue(expectedCell), + CellUtil.cloneValue(actualCell), isBeforeRebuild, INVALID_ROW); return; } } @@ -329,7 +351,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { String errorMsg = "Index has extra cells (in iteration " + iteration + ")"; byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), - errorMsg, isBeforeRebuild); + errorMsg, isBeforeRebuild, EXTRA_CELLS); verificationPhaseResult.setIndexHasExtraCellsCount(verificationPhaseResult.getIndexHasExtraCellsCount() + 1); } } @@ -719,7 +741,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { actualSize); logToIndexToolOutputTable(dataKey, indexRow.getRow(), getTimestamp(expectedMutationList.get(expectedIndex)), - 0, errorMsg, isBeforeRebuild); + 0, errorMsg, isBeforeRebuild, BEYOND_MAX_LOOKBACK_INVALID); return false; } else { @@ -730,7 +752,8 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); String errorMsg = "Not matching index row"; logToIndexToolOutputTable(dataKey, indexRow.getRow(), - getTimestamp(expectedMutationList.get(0)), 0L, errorMsg, isBeforeRebuild); + getTimestamp(expectedMutationList.get(0)), 0L, errorMsg, isBeforeRebuild, + INVALID_ROW); } verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1); return false; @@ -791,17 +814,21 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { } currentTime = EnvironmentEdgeManager.currentTimeMillis(); String errorMsg; + IndexVerificationOutputRepository.IndexVerificationErrorType errorType; if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(mutation))){ errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; + errorType = BEYOND_MAX_LOOKBACK_MISSING; verificationPhaseResult. setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1); } else { errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW; + errorType = MISSING_ROW; verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1); } byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexKey), viewConstants); - logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg, isBeforeRebuild); + logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg, + isBeforeRebuild, errorType); } // Leave the invalid and missing rows in indexKeyToMutationMap indexKeyToMutationMap.putAll(invalidIndexRows); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java index 3cf5446..549f876 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java @@ -52,6 +52,7 @@ public class IndexVerificationOutputRepository implements AutoCloseable { private Table outputTable; private IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE; + private boolean shouldLogBeyondMaxLookback = true; public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL"; public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME); @@ -71,6 +72,8 @@ public class IndexVerificationOutputRepository implements AutoCloseable { public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS); public final static String ERROR_MESSAGE = "Error"; public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE); + public static final String ERROR_TYPE = "ErrorType"; + public static final byte[] ERROR_TYPE_BYTES = Bytes.toBytes(ERROR_TYPE); public static String VERIFICATION_PHASE = "Phase"; public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE); @@ -85,6 +88,15 @@ public class IndexVerificationOutputRepository implements AutoCloseable { public static final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE"); public static final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER"); + + public enum IndexVerificationErrorType { + INVALID_ROW, + MISSING_ROW, + EXTRA_CELLS, + BEYOND_MAX_LOOKBACK_INVALID, + BEYOND_MAX_LOOKBACK_MISSING, + UNKNOWN + } /** * Only usable for the create table / read path or for testing. Use setOutputTable and * setIndexTable first to write. @@ -117,6 +129,10 @@ public class IndexVerificationOutputRepository implements AutoCloseable { this.disableLoggingVerifyType = disableLoggingVerifyType; } + public void setShouldLogBeyondMaxLookback(boolean shouldLogBeyondMaxLookback) { + this.shouldLogBeyondMaxLookback = shouldLogBeyondMaxLookback; + } + public static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) { byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); byte[] rowKey; @@ -173,9 +189,11 @@ public class IndexVerificationOutputRepository implements AutoCloseable { public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, String errorMsg, byte[] expectedValue, byte[] actualValue, - long scanMaxTs, byte[] tableName, boolean isBeforeRebuild) + long scanMaxTs, byte[] tableName, + boolean isBeforeRebuild, + IndexVerificationErrorType errorType) throws IOException { - if (shouldLogOutput(isBeforeRebuild)) { + if (shouldLogOutput(isBeforeRebuild, errorType)) { byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey); Put put = new Put(rowKey); put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, tableName); @@ -193,6 +211,8 @@ public class IndexVerificationOutputRepository implements AutoCloseable { errorMessageBytes = Bytes.toBytes(errorMsg); } put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, errorMessageBytes); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_TYPE_BYTES, + Bytes.toBytes(errorType.toString())); if (isBeforeRebuild) { put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, PHASE_BEFORE_VALUE); } else { @@ -202,7 +222,12 @@ public class IndexVerificationOutputRepository implements AutoCloseable { } } - public boolean shouldLogOutput(boolean isBeforeRebuild) { + public boolean shouldLogOutput(boolean isBeforeRebuild, IndexVerificationErrorType errorType) { + return shouldLogOutputForVerifyType(isBeforeRebuild) && + shouldLogOutputForErrorType(errorType); + } + + private boolean shouldLogOutputForVerifyType(boolean isBeforeRebuild) { if (disableLoggingVerifyType.equals(IndexTool.IndexDisableLoggingType.BOTH)) { return false; } @@ -219,6 +244,15 @@ public class IndexVerificationOutputRepository implements AutoCloseable { return false; } + private boolean shouldLogOutputForErrorType(IndexVerificationErrorType errorType) { + if (errorType != null && + (errorType.equals(IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID) || + errorType.equals(IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING))){ + return shouldLogBeyondMaxLookback; + } + return true; + } + public static byte[] getErrorMessageBytes(String errorMsg, byte[] expectedValue, byte[] actualValue) { byte[] errorMessageBytes; errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length + @@ -297,6 +331,18 @@ public class IndexVerificationOutputRepository implements AutoCloseable { builder.setExpectedValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES)); builder.setActualValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES)); builder.setPhaseValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES)); + IndexVerificationErrorType errorType; + try { + errorType = + IndexVerificationErrorType.valueOf( + Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_TYPE_BYTES))); + } catch (Throwable e) { + //in case we have a cast exception because an incompatible version of the enum produced + //the row, or an earlier version that didn't record error types, it's better to mark + // the error type unknown and move on rather than fail + errorType = IndexVerificationErrorType.UNKNOWN; + } + builder.setErrorType(errorType); return builder.build(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java index 8c54796..4dad9b6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java @@ -18,6 +18,7 @@ package org.apache.phoenix.mapreduce.index; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType; import java.util.Arrays; import java.util.Objects; @@ -35,13 +36,14 @@ public class IndexVerificationOutputRow { private byte[] expectedValue; private byte[] actualValue; private byte[] phaseValue; + private IndexVerificationErrorType errorType; private IndexVerificationOutputRow(String dataTableName, String indexTableName, byte[] dataTableRowKey, Long scanMaxTimestamp, byte[] indexTableRowKey, long dataTableRowTimestamp, long indexTableRowTimestamp, String errorMessage, byte[] expectedValue, byte[] actualValue, - byte[] phaseValue) { + byte[] phaseValue, IndexVerificationErrorType errorType) { this.dataTableName = dataTableName; this.indexTableName = indexTableName; this.scanMaxTimestamp = scanMaxTimestamp; @@ -53,6 +55,7 @@ public class IndexVerificationOutputRow { this.expectedValue = expectedValue; this.actualValue = actualValue; this.phaseValue = phaseValue; + this.errorType = errorType; } public String getDataTableName() { @@ -119,7 +122,8 @@ public class IndexVerificationOutputRow { Objects.equals(errorMessage, otherRow.getErrorMessage()) && Arrays.equals(expectedValue, otherRow.getExpectedValue()) && Arrays.equals(actualValue, otherRow.getActualValue()) && - Arrays.equals(phaseValue, otherRow.getPhaseValue()); + Arrays.equals(phaseValue, otherRow.getPhaseValue()) && + Objects.equals(errorType, otherRow.getErrorType()); } @Override @@ -143,9 +147,14 @@ public class IndexVerificationOutputRow { sb.append(IndexVerificationOutputRepository.ACTUAL_VALUE + ": ").append(Bytes.toString(actualValue)).append( ","); sb.append(IndexVerificationOutputRepository.VERIFICATION_PHASE + ": ").append(Bytes.toString(phaseValue)); + sb.append(IndexVerificationOutputRepository.ERROR_TYPE + ": " ).append(Objects.toString(errorType)); return sb.toString(); } + public IndexVerificationErrorType getErrorType() { + return errorType; + } + public static class IndexVerificationOutputRowBuilder { private String dataTableName; private String indexTableName; @@ -158,6 +167,7 @@ public class IndexVerificationOutputRow { private byte[] expectedValue; private byte[] actualValue; private byte[] phaseValue; + private IndexVerificationErrorType errorType; public IndexVerificationOutputRowBuilder setDataTableName(String dataTableName) { this.dataTableName = dataTableName; @@ -214,10 +224,15 @@ public class IndexVerificationOutputRow { return this; } + public IndexVerificationOutputRowBuilder setErrorType(IndexVerificationErrorType errorType) { + this.errorType = errorType; + return this; + } + public IndexVerificationOutputRow build() { return new IndexVerificationOutputRow(dataTableName, indexTableName, dataTableRowKey, scanMaxTimestamp, indexTableRowKey, dataTableRowTimestamp, indexTableRowTimestamp, - errorMessage, expectedValue, actualValue, phaseValue); + errorMessage, expectedValue, actualValue, phaseValue, errorType); } } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java index 1b8ed55..86f39b9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java @@ -35,6 +35,7 @@ import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.coprocessor.IndexToolVerificationResult; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; @@ -279,10 +280,13 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { doNothing().when(rebuildScanner) .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(), Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(), - Matchers.<byte[]>any(), Matchers.<byte[]>any(), Matchers.anyBoolean()); + Matchers.<byte[]>any(), Matchers.<byte[]>any(), Matchers.anyBoolean(), + Mockito.<IndexVerificationOutputRepository.IndexVerificationErrorType>any()); doNothing().when(rebuildScanner) .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(), - Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(), Matchers.anyBoolean()); + Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(), + Matchers.anyBoolean(), + Mockito.<IndexVerificationOutputRepository.IndexVerificationErrorType>any()); //populate the local map to use to create actual mutations indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);