This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 78444fd PHOENIX-5799 - Inline Index Verification Output API 78444fd is described below commit 78444fd36425e893aef84bec0228902a0b8b53ad Author: Geoffrey Jacoby <gjac...@apache.org> AuthorDate: Fri Mar 27 10:18:16 2020 -0700 PHOENIX-5799 - Inline Index Verification Output API --- .../org/apache/phoenix/end2end/IndexToolIT.java | 32 +- .../index/IndexVerificationOutputRepositoryIT.java | 172 ++++++++++ .../index/IndexVerificationResultRepositoryIT.java | 142 ++++++++ .../coprocessor/IndexRebuildRegionScanner.java | 200 +++--------- .../coprocessor/IndexToolVerificationResult.java | 362 +++++++++++++-------- .../apache/phoenix/mapreduce/index/IndexTool.java | 75 +---- .../index/IndexVerificationOutputRepository.java | 300 +++++++++++++++++ .../index/IndexVerificationOutputRow.java | 223 +++++++++++++ .../index/IndexVerificationResultRepository.java | 258 +++++++++++++++ .../index/PhoenixIndexImportDirectReducer.java | 13 +- .../java/org/apache/phoenix/util/ByteUtil.java | 46 +++ 11 files changed, 1435 insertions(+), 388 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 71cb530..4840047 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper; @@ -370,10 +372,11 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { private void dropIndexToolTables(Connection conn) throws Exception { Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES); + TableName indexToolOutputTable = + TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); admin.disableTable(indexToolOutputTable); admin.deleteTable(indexToolOutputTable); - TableName indexToolResultTable = TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES); + TableName indexToolResultTable = TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES); admin.disableTable(indexToolResultTable); admin.deleteTable(indexToolResultTable); } @@ -477,12 +480,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { // This method verifies the common prefix, i.e., "timestamp | index table name | ", since the rest of the // fields may include the separator key - int offset = Bytes.indexOf(rowKey, IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE); + int offset = Bytes.indexOf(rowKey, IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE); offset++; byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); assertEquals(Bytes.compareTo(rowKey, offset, indexTableFullNameBytes.length, indexTableFullNameBytes, 0, indexTableFullNameBytes.length), 0); - assertEquals(rowKey[offset + indexTableFullNameBytes.length], IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE[0]); + assertEquals(rowKey[offset + indexTableFullNameBytes.length], + IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]); } private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName) @@ -490,7 +494,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName); Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(IndexTool.OUTPUT_TABLE_NAME_BYTES); + .getTable(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); Scan scan = new Scan(); ResultScanner scanner = hIndexTable.getScanner(scan); boolean dataTableNameCheck = false; @@ -499,28 +503,30 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { for (Result result = scanner.next(); result != null; result = scanner.next()) { for (Cell cell : result.rawCells()) { assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0, - IndexTool.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0); + IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY, 0, + IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0); if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - IndexTool.DATA_TABLE_NAME_BYTES, 0, IndexTool.DATA_TABLE_NAME_BYTES.length) == 0) { + IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES.length) == 0) { dataTableNameCheck = true; assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0); } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - IndexTool.INDEX_TABLE_NAME_BYTES, 0, IndexTool.INDEX_TABLE_NAME_BYTES.length) == 0) { + IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES.length) == 0) { indexTableNameCheck = true; assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0); } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - IndexTool.ERROR_MESSAGE_BYTES, 0, IndexTool.ERROR_MESSAGE_BYTES.length) == 0) { + IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES, 0, IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES.length) == 0) { errorMessageCell = cell; } } } - assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null); + assertTrue( "DataTableNameCheck was false", dataTableNameCheck); + assertTrue("IndexTableNameCheck was false", indexTableNameCheck); + assertTrue("Error message cell was null", errorMessageCell != null); verifyIndexTableRowKey(CellUtil.cloneRow(errorMessageCell), indexTableFullName); hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(IndexTool.RESULT_TABLE_NAME_BYTES); + .getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES); scan = new Scan(); scanner = hIndexTable.getScanner(scan); Result result = scanner.next(); @@ -716,7 +722,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } assertTrue(status.getFirst() == 0); - TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES); + TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); admin.disableTable(indexToolOutputTable); admin.deleteTable(indexToolOutputTable); // Run the index tool using the only-verify option, verify it gives no mismatch diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java new file mode 100644 index 0000000..0b67044 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT { + + @BeforeClass + public static void setupClass() throws Exception { + Map<String, String> props = Collections.emptyMap(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testReadIndexVerificationOutputRow() throws Exception { + String expectedErrorMessage = "I am an error message"; + byte[] expectedValue = Bytes.toBytes("ab"); + byte[] actualValue = Bytes.toBytes("ac"); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = "T" + generateUniqueName(); + byte[] tableNameBytes = Bytes.toBytes(tableName); + String indexName = "I" + generateUniqueName(); + createTableAndIndexes(conn, tableName, indexName); + byte[] indexNameBytes = Bytes.toBytes(indexName); + IndexVerificationOutputRepository outputRepository = + new IndexVerificationOutputRepository(indexNameBytes, conn); + outputRepository.createOutputTable(conn); + populateTable(conn, tableName); + byte[] dataRowKey = getRowKey(conn, tableNameBytes); + byte[] indexRowKey = getRowKey(conn, indexNameBytes); + long dataRowTs = getTimestamp(conn, tableNameBytes); + long indexRowTs = getTimestamp(conn, indexNameBytes); + long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis(); + outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs, tableNameBytes, true); + //now increment the scan time by 1 and do it again + outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs +1, tableNameBytes, false); + //make sure we only get the first row back + IndexVerificationOutputRow expectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE); + verifyOutputRow(outputRepository, scanMaxTs, indexNameBytes, expectedRow); + //make sure we get the second row back + IndexVerificationOutputRow secondExpectedRow = buildVerificationRow(dataRowKey, + indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE); + verifyOutputRow(outputRepository, scanMaxTs+1, indexNameBytes, secondExpectedRow); + } + + } + + public void verifyOutputRow(IndexVerificationOutputRepository outputRepository, long scanMaxTs, + byte[] indexNameBytes, IndexVerificationOutputRow expectedRow) + throws IOException { + List<IndexVerificationOutputRow> actualRows = + outputRepository.getOutputRows(scanMaxTs, indexNameBytes); + assertNotNull(actualRows); + assertEquals(1, actualRows.size()); + assertEquals(expectedRow, actualRows.get(0)); + } + + private IndexVerificationOutputRow buildVerificationRow(byte[] dataRowKey, byte[] indexRowKey, + long dataRowTs, long indexRowTs, + String expectedErrorMessage, + byte[] expectedValue, byte[] actualValue, + long scanMaxTs, + byte[] tableNameBytes, + byte[] indexNameBytes, + byte[] phaseBeforeValue) { + IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder = + new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder(); + return builder.setDataTableRowKey(dataRowKey). + setIndexTableRowKey(indexRowKey). + setScanMaxTimestamp(dataRowTs). + setDataTableRowTimestamp(dataRowTs). + setIndexTableRowTimestamp(indexRowTs). + setErrorMessage(Bytes.toString( + IndexVerificationOutputRepository. + getErrorMessageBytes(expectedErrorMessage, expectedValue, actualValue))). + setExpectedValue(expectedValue). + setActualValue(actualValue). + setScanMaxTimestamp(scanMaxTs). + setDataTableName(Bytes.toString(tableNameBytes)). + setIndexTableName(Bytes.toString(indexNameBytes)). + setPhaseValue(phaseBeforeValue). + build(); + } + + private byte[] getRowKey(Connection conn, byte[] tableNameBytes) + throws SQLException, IOException { + Scan scan = new Scan(); + Table table = + conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes); + ResultScanner scanner = table.getScanner(scan); + Result r = scanner.next(); + return r.getRow(); + } + + private long getTimestamp(Connection conn, byte[] tableNameBytes) throws SQLException, + IOException { + Scan scan = new Scan(); + Table table = + conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes); + ResultScanner scanner = table.getScanner(scan); + Result r = scanner.next(); + return r.listCells().get(0).getTimestamp(); + } + + private void createTable(Connection conn, String tableName) throws Exception { + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " + + "val3 varchar(10))"); + } + + private void populateTable(Connection conn, String tableName) throws Exception { + conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + } + + private void createTableAndIndexes(Connection conn, String dataTableName, + String indexTableName) throws Exception { + createTable(conn, dataTableName); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)"); + conn.commit(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java new file mode 100644 index 0000000..0ffd13a --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.IndexToolVerificationResult; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class IndexVerificationResultRepositoryIT extends ParallelStatsDisabledIT { + + @BeforeClass + public static void setupClass() throws Exception { + Map<String, String> props = Collections.emptyMap(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testReadResultRow() throws Exception { + String tableName = "T" + generateUniqueName(); + String indexName = "I" + generateUniqueName(); + byte[] indexNameBytes = Bytes.toBytes(indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + createTableAndIndex(conn, tableName, indexName); + IndexVerificationResultRepository resultRepository = + new IndexVerificationResultRepository(conn, indexNameBytes); + resultRepository.createResultTable(conn); + byte[] regionOne = Bytes.toBytes("a.1.00000000000000000000"); + byte[] regionTwo = Bytes.toBytes("a.2.00000000000000000000"); + long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis(); + IndexToolVerificationResult expectedResult = getExpectedResult(scanMaxTs); + resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH, + regionOne); + resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH, + regionTwo); + IndexToolVerificationResult actualResult = + resultRepository.getVerificationResult(conn, scanMaxTs); + assertVerificationResult(expectedResult, actualResult); + + } + } + + private void assertVerificationResult(IndexToolVerificationResult expectedResult, IndexToolVerificationResult actualResult) { + assertEquals(expectedResult.getScanMaxTs(), actualResult.getScanMaxTs()); + assertArrayEquals(expectedResult.getStartRow(), actualResult.getStartRow()); + assertArrayEquals(actualResult.getStopRow(), actualResult.getStopRow()); + + //because we're combining two near-identical rows (same values, different region) + //we assert on 2x the expected value + assertEquals(2 * expectedResult.getBeforeRebuildExpiredIndexRowCount(), + actualResult.getBeforeRebuildExpiredIndexRowCount()); + assertEquals(2 * expectedResult.getBeforeRebuildInvalidIndexRowCount(), + actualResult.getBeforeRebuildInvalidIndexRowCount()); + assertEquals(2 * expectedResult.getBeforeRebuildMissingIndexRowCount(), + actualResult.getBeforeRebuildMissingIndexRowCount()); + assertEquals(2 * expectedResult.getBeforeRebuildValidIndexRowCount(), + actualResult.getBeforeRebuildValidIndexRowCount()); + + assertEquals(2 * expectedResult.getAfterRebuildExpiredIndexRowCount(), + actualResult.getAfterRebuildExpiredIndexRowCount()); + assertEquals(2 * expectedResult.getAfterRebuildInvalidIndexRowCount(), + actualResult.getAfterRebuildInvalidIndexRowCount()); + assertEquals(2 * expectedResult.getAfterRebuildMissingIndexRowCount(), + actualResult.getAfterRebuildMissingIndexRowCount()); + assertEquals(2 * expectedResult.getAfterRebuildValidIndexRowCount(), + actualResult.getAfterRebuildValidIndexRowCount()); + + assertEquals(2 * expectedResult.getScannedDataRowCount(), + actualResult.getScannedDataRowCount()); + assertEquals(2 * expectedResult.getRebuiltIndexRowCount(), + actualResult.getRebuiltIndexRowCount()); + + } + + private IndexToolVerificationResult getExpectedResult(long scanMaxTs) { + byte[] startRow = Bytes.toBytes("a"); + byte[] stopRow = Bytes.toBytes("b"); + IndexToolVerificationResult result = new IndexToolVerificationResult(startRow, stopRow, + scanMaxTs); + result.setScannedDataRowCount(1); + result.setRebuiltIndexRowCount(1); + IndexToolVerificationResult.PhaseResult before = + new IndexToolVerificationResult.PhaseResult(); + populatePhaseResult(before); + IndexToolVerificationResult.PhaseResult after = + new IndexToolVerificationResult.PhaseResult(); + populatePhaseResult(after); + result.setBefore(before); + result.setAfter(after); + return result; + } + + private void populatePhaseResult(IndexToolVerificationResult.PhaseResult result){ + result.setValidIndexRowCount(1); + result.setBeyondMaxLookBackInvalidIndexRowCount(1); + result.setBeyondMaxLookBackMissingIndexRowCount(1); + result.setExpiredIndexRowCount(1); + result.setInvalidIndexRowCount(1); + result.setMissingIndexRowCount(1); + } + private void createTable(Connection conn, String tableName) throws Exception { + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " + + "val3 varchar(10))"); + } + + private void createTableAndIndex(Connection conn, String dataTableName, + String indexTableName) throws Exception { + createTable(conn, dataTableName); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)"); + conn.commit(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 2bdaf1d..8b7e3f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn; import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; -import static org.apache.phoenix.mapreduce.index.IndexTool.*; import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; @@ -82,6 +81,8 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.query.KeyRange; @@ -108,7 +109,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { public static final String NO_EXPECTED_MUTATION = "No expected mutation"; public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty"; - public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|"); private long pageSizeInRows = Long.MAX_VALUE; private int rowCountPerTask; private boolean hasMore; @@ -125,8 +125,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private IndexMaintainer indexMaintainer; private byte[] indexRowKey = null; private Table indexHTable = null; - private Table outputHTable = null; - private Table resultHTable = null; private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE; private boolean verify = false; private Map<byte[], List<Mutation>> indexKeyToMutationMap; @@ -135,7 +133,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private TaskBatch<Boolean> tasks; private String exceptionMessage; private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver; - private RegionCoprocessorEnvironment env; private HTableFactory hTableFactory; private int indexTableTTL = 0; private IndexToolVerificationResult verificationResult; @@ -144,6 +141,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private int singleRowRebuildReturnCode; private Map<byte[], NavigableSet<byte[]>> familyMap; private byte[][] viewConstants; + private IndexVerificationResultRepository verificationResultRepository; + private IndexVerificationOutputRepository verificationOutputRepository; private long maxLookBackInMills; @VisibleForTesting @@ -179,7 +178,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { this.innerScanner = innerScanner; this.region = region; - this.env = env; this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver; indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY); if (indexRowKey != null) { @@ -188,7 +186,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); if (valueBytes != null) { - verificationResult = new IndexToolVerificationResult(); + verificationResult = new IndexToolVerificationResult(scan); verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes); if (verifyType != IndexTool.IndexVerifyType.NONE) { verify = true; @@ -197,8 +195,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION); indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive(); - outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES)); - resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES)); + verificationResultRepository = + new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); + verificationOutputRepository = + new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory); indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( @@ -247,88 +247,21 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return false; } - private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName, - byte[] startRow, byte[] stopRow) { - byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); - int targetOffset = 0; - // The row key for the result table : timestamp | index table name | datable table region name | - // scan start row | scan stop row - byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + - ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length + - startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length]; - Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); - targetOffset += keyPrefix.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); - targetOffset += indexTableName.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length); - targetOffset += regionName.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length); - targetOffset += startRow.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length); - return rowKey; - } - private void logToIndexToolResultTable() throws IOException { - long scanMaxTs = scan.getTimeRange().getMax(); - byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), - Bytes.toBytes(region.getRegionInfo().getRegionNameAsString()), scan.getStartRow(), scan.getStopRow()); - Put put = new Put(rowKey); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount))); - if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH || - verifyType == IndexTool.IndexVerifyType.ONLY) { - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackMissingIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackInvalidIndexRowCount))); - } - if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackMissingIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackInvalidIndexRowCount))); - } - resultHTable.put(put); - } @Override public void close() throws IOException { innerScanner.close(); if (verify) { try { - logToIndexToolResultTable(); + verificationResultRepository.logToIndexToolResultTable(verificationResult, + verifyType, region.getRegionInfo().getRegionName()); } finally { this.pool.stop("IndexRebuildRegionScanner is closing"); hTableFactory.shutdown(); indexHTable.close(); - outputHTable.close(); - resultHTable.close(); + verificationResultRepository.close(); + verificationOutputRepository.close(); } } } @@ -419,80 +352,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return true; } - @VisibleForTesting public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, - String errorMsg) throws IOException { - logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, - errorMsg, null, null); - - } - - private static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) { - byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); - byte[] rowKey; - int targetOffset = 0; - // The row key for the output table : timestamp | index table name | data row key - rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + - ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length]; - Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); - targetOffset += keyPrefix.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); - targetOffset += indexTableName.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length); - return rowKey; + String errorMsg) throws IOException { + logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null); } @VisibleForTesting public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, - String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException { - final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:"); - final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:"); - final int PREFIX_LENGTH = 3; - final int TOTAL_PREFIX_LENGTH = 6; - final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE"); - final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER"); - long scanMaxTs = scan.getTimeRange().getMax(); - byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey); - Put put = new Put(rowKey); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES, - scanMaxTs, region.getRegionInfo().getTable().getName()); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES, - scanMaxTs, indexMaintainer.getIndexTableName()); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs))); - - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES, - scanMaxTs, indexRowKey); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs))); - byte[] errorMessageBytes; - if (expectedValue != null) { - errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length + - TOTAL_PREFIX_LENGTH]; - Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length()); - int length = errorMsg.length(); - Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); - length += PREFIX_LENGTH; - Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length); - length += expectedValue.length; - Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); - length += PREFIX_LENGTH; - Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length); - - } else { - errorMessageBytes = Bytes.toBytes(errorMsg); - } - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes); - if (isBeforeRebuilt) { - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE); - } else { - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE); - } - outputHTable.put(put); + String errorMsg, byte[] expectedVaue, byte[] actualValue) + throws IOException { + verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, + errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(), + region.getRegionInfo().getTable().getName(), isBeforeRebuilt); } private static long getMaxTimestamp(Mutation m) { @@ -854,7 +725,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { // verify // TODO: have a metric to update for these cases if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) { - verificationPhaseResult.expiredIndexRowCount++; + verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1); return true; } actual = actualMutationList.get(actualIndex); @@ -907,7 +778,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (expectedIndex == expectedSize ){ // every expected mutation has its matching one in the actual list. - verificationPhaseResult.validIndexRowCount++; + verificationPhaseResult.setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount() + 1); return true; } @@ -915,7 +786,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (expectedIndex > 0) { // if current expected index mutation is beyond max look back window, we only need to make sure its latest // mutation is a matching one, as an SCN query is required. - verificationPhaseResult.validIndexRowCount++; + verificationPhaseResult. + setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount() + 1); return true; } @@ -924,7 +796,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { // We report it as a failure, so "before" option can trigger the index rebuild for this row. // This repair is required, when there is only one index row for a given data table row and the timestamp of that row // can be beyond maxLookBack. - verificationPhaseResult.beyondMaxLookBackInvalidIndexRowCount++; + verificationPhaseResult. + setBeyondMaxLookBackInvalidIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackInvalidIndexRowCount() + 1); byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants); String errorMsg = String.format("Expect %1$s mutations but got %2$s (beyond maxLookBack)", expectedSize, @@ -944,7 +817,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { logToIndexToolOutputTable(dataKey, indexRow.getRow(), getTimestamp(expectedMutationList.get(0)), 0L, errorMsg); } - verificationPhaseResult.invalidIndexRowCount++; + verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1); return false; } } @@ -999,7 +872,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { List<Mutation> mutationList = indexKeyToMutationMap.get(key); if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) { itr.remove(); - verificationPhaseResult.expiredIndexRowCount++; + verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1); } } } @@ -1015,11 +888,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { String errorMsg; if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){ errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; - verificationPhaseResult.beyondMaxLookBackMissingIndexRowCount++; + verificationPhaseResult. + setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1); } else { errorMsg = "Missing index row"; - verificationPhaseResult.missingIndexRowCount++; + verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1); } byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants); logToIndexToolOutputTable(dataKey, @@ -1145,12 +1019,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } private void verifyAndOrRebuildIndex() throws IOException { - IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(); - nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size(); + IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(scan); + nextVerificationResult.setScannedDataRowCount(dataKeyToMutationMap.size()); if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) { // For these options we start with rebuilding index rows rebuildIndexRows(mutations); - nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size(); + nextVerificationResult.setRebuiltIndexRowCount(dataKeyToMutationMap.size()); isBeforeRebuilt = false; } if (verifyType == IndexTool.IndexVerifyType.NONE) { @@ -1161,7 +1035,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); // For these options we start with verifying index rows parallelizeIndexVerify(verificationPhaseResult); - nextVerificationResult.before.add(verificationPhaseResult); + nextVerificationResult.getBefore().add(verificationPhaseResult); } if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { // For these options, we have identified the rows to be rebuilt and now need to rebuild them @@ -1177,7 +1051,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } } rebuildIndexRows(mutations); - nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size(); + nextVerificationResult.setRebuiltIndexRowCount(nextVerificationResult.getRebuiltIndexRowCount() + dataKeyToMutationMap.size()); isBeforeRebuilt = false; } @@ -1189,7 +1063,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond()); } parallelizeIndexVerify(verificationPhaseResult); - nextVerificationResult.after.add(verificationPhaseResult); + nextVerificationResult.getAfter().add(verificationPhaseResult); } verificationResult.add(nextVerificationResult); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java index 1fbb866..989e03f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java @@ -19,63 +19,141 @@ package org.apache.phoenix.coprocessor; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.mapreduce.index.IndexTool; -import java.io.IOException; -import java.util.Arrays; - -import static org.apache.phoenix.mapreduce.index.IndexTool.*; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.REBUILT_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.SCANNED_DATA_ROW_COUNT_BYTES; public class IndexToolVerificationResult { + + public void setScannedDataRowCount(long scannedDataRowCount) { + this.scannedDataRowCount = scannedDataRowCount; + } + + public void setRebuiltIndexRowCount(long rebuiltIndexRowCount) { + this.rebuiltIndexRowCount = rebuiltIndexRowCount; + } + + public PhaseResult getBefore() { + return before; + } + + public void setBefore(PhaseResult before) { + this.before = before; + } + + public PhaseResult getAfter() { + return after; + } + + public void setAfter(PhaseResult after) { + this.after = after; + } + + public byte[] getStartRow() { + return startRow; + } + + public byte[] getStopRow() { + return stopRow; + } + + public long getScanMaxTs() { + return scanMaxTs; + } + + public IndexToolVerificationResult(long scanMaxTs) { + this.scanMaxTs = scanMaxTs; + } + + public IndexToolVerificationResult(byte[] startRow, byte[] stopRow, long scanMaxTs) { + this.setStartRow(startRow); + this.setStopRow(stopRow); + this.scanMaxTs = scanMaxTs; + } + + public IndexToolVerificationResult(Scan scan) { + this.setStartRow(scan.getStartRow()); + this.setStopRow(scan.getStopRow()); + this.scanMaxTs = scan.getTimeRange().getMax(); + } + + public byte[] getRegion() { + return region; + } + + public void setStartRow(byte[] startRow) { + this.startRow = startRow; + } + + public void setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + } + public static class PhaseResult { - long validIndexRowCount = 0; - long expiredIndexRowCount = 0; - long missingIndexRowCount = 0; - long invalidIndexRowCount = 0; - long beyondMaxLookBackMissingIndexRowCount = 0; - long beyondMaxLookBackInvalidIndexRowCount = 0; + private long validIndexRowCount = 0; + private long expiredIndexRowCount = 0; + private long missingIndexRowCount = 0; + private long invalidIndexRowCount = 0; + private long beyondMaxLookBackMissingIndexRowCount = 0; + private long beyondMaxLookBackInvalidIndexRowCount = 0; public void add(PhaseResult phaseResult) { - validIndexRowCount += phaseResult.validIndexRowCount; - expiredIndexRowCount += phaseResult.expiredIndexRowCount; - missingIndexRowCount += phaseResult.missingIndexRowCount; - invalidIndexRowCount += phaseResult.invalidIndexRowCount; - beyondMaxLookBackMissingIndexRowCount += phaseResult.beyondMaxLookBackMissingIndexRowCount; - beyondMaxLookBackInvalidIndexRowCount += phaseResult.beyondMaxLookBackInvalidIndexRowCount; + setBeyondMaxLookBackMissingIndexRowCount(getBeyondMaxLookBackMissingIndexRowCount() + + phaseResult.getBeyondMaxLookBackMissingIndexRowCount()); + setBeyondMaxLookBackInvalidIndexRowCount(getBeyondMaxLookBackInvalidIndexRowCount() + + phaseResult.getBeyondMaxLookBackInvalidIndexRowCount()); + setValidIndexRowCount(getValidIndexRowCount() + phaseResult.getValidIndexRowCount()); + setExpiredIndexRowCount(getExpiredIndexRowCount() + phaseResult.getExpiredIndexRowCount()); + setMissingIndexRowCount(getMissingIndexRowCount() + phaseResult.getMissingIndexRowCount()); + setInvalidIndexRowCount(getInvalidIndexRowCount() + phaseResult.getInvalidIndexRowCount()); } - public PhaseResult(){} + public PhaseResult() { + } public PhaseResult(long validIndexRowCount, long expiredIndexRowCount, - long missingIndexRowCount, long invalidIndexRowCount, - long beyondMaxLookBackMissingIndexRowCount, long beyondMaxLookBackInvalidIndexRowCount) { - this.validIndexRowCount = validIndexRowCount; - this.expiredIndexRowCount = expiredIndexRowCount; - this.missingIndexRowCount = missingIndexRowCount; - this.invalidIndexRowCount = invalidIndexRowCount; - this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount; - this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount; + long missingIndexRowCount, long invalidIndexRowCount, + long beyondMaxLookBackMissingIndexRowCount, + long beyondMaxLookBackInvalidIndexRowCount) { + this.setValidIndexRowCount(validIndexRowCount); + this.setExpiredIndexRowCount(expiredIndexRowCount); + this.setMissingIndexRowCount(missingIndexRowCount); + this.setInvalidIndexRowCount(invalidIndexRowCount); + this.setBeyondMaxLookBackInvalidIndexRowCount(beyondMaxLookBackInvalidIndexRowCount); + this.setBeyondMaxLookBackMissingIndexRowCount(beyondMaxLookBackMissingIndexRowCount); } + public long getTotalCount() { - return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount + beyondMaxLookBackMissingIndexRowCount + beyondMaxLookBackInvalidIndexRowCount; + return getValidIndexRowCount() + getExpiredIndexRowCount() + getMissingIndexRowCount() + getInvalidIndexRowCount() + + getBeyondMaxLookBackMissingIndexRowCount() + getBeyondMaxLookBackInvalidIndexRowCount(); } @Override public String toString() { return "PhaseResult{" + - "validIndexRowCount=" + validIndexRowCount + - ", expiredIndexRowCount=" + expiredIndexRowCount + - ", missingIndexRowCount=" + missingIndexRowCount + - ", invalidIndexRowCount=" + invalidIndexRowCount + - ", beyondMaxLookBackMissingIndexRowCount=" + beyondMaxLookBackMissingIndexRowCount + - ", beyondMaxLookBackInvalidIndexRowCount=" + beyondMaxLookBackInvalidIndexRowCount; + "validIndexRowCount=" + getValidIndexRowCount() + + ", expiredIndexRowCount=" + getExpiredIndexRowCount() + + ", missingIndexRowCount=" + getMissingIndexRowCount() + + ", invalidIndexRowCount=" + getInvalidIndexRowCount() + + ", beyondMaxLookBackMissingIndexRowCount=" + getBeyondMaxLookBackMissingIndexRowCount() + + ", beyondMaxLookBackInvalidIndexRowCount=" + getBeyondMaxLookBackInvalidIndexRowCount(); } @Override @@ -87,39 +165,91 @@ public class IndexToolVerificationResult { return false; } PhaseResult pr = (PhaseResult) o; - return this.expiredIndexRowCount == pr.expiredIndexRowCount - && this.validIndexRowCount == pr.validIndexRowCount - && this.invalidIndexRowCount == pr.invalidIndexRowCount - && this.missingIndexRowCount == pr.missingIndexRowCount - && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount - && this.beyondMaxLookBackMissingIndexRowCount == pr.beyondMaxLookBackMissingIndexRowCount; + return this.getExpiredIndexRowCount() == pr.getExpiredIndexRowCount() + && this.getValidIndexRowCount() == pr.getValidIndexRowCount() + && this.getInvalidIndexRowCount() == pr.getInvalidIndexRowCount() + && this.getMissingIndexRowCount() == pr.getMissingIndexRowCount() + && this.getBeyondMaxLookBackInvalidIndexRowCount() == pr.getBeyondMaxLookBackInvalidIndexRowCount() + && this.getBeyondMaxLookBackMissingIndexRowCount() == pr.getBeyondMaxLookBackMissingIndexRowCount(); } @Override public int hashCode() { long result = 17; - result = 31 * result + expiredIndexRowCount; - result = 31 * result + validIndexRowCount; - result = 31 * result + missingIndexRowCount; - result = 31 * result + invalidIndexRowCount; - result = 31 * result + beyondMaxLookBackMissingIndexRowCount; - result = 31 * result + beyondMaxLookBackInvalidIndexRowCount; - return (int)result; + result = 31 * result + getExpiredIndexRowCount(); + result = 31 * result + getValidIndexRowCount(); + result = 31 * result + getMissingIndexRowCount(); + result = 31 * result + getInvalidIndexRowCount(); + result = 31 * result + getBeyondMaxLookBackMissingIndexRowCount(); + result = 31 * result + getBeyondMaxLookBackInvalidIndexRowCount(); + return (int) result; + } + + public long getValidIndexRowCount() { + return validIndexRowCount; + } + + public void setValidIndexRowCount(long validIndexRowCount) { + this.validIndexRowCount = validIndexRowCount; + } + + public long getExpiredIndexRowCount() { + return expiredIndexRowCount; + } + + public void setExpiredIndexRowCount(long expiredIndexRowCount) { + this.expiredIndexRowCount = expiredIndexRowCount; + } + + public long getMissingIndexRowCount() { + return missingIndexRowCount; + } + + public void setMissingIndexRowCount(long missingIndexRowCount) { + this.missingIndexRowCount = missingIndexRowCount; + } + + public long getInvalidIndexRowCount() { + return invalidIndexRowCount; + } + + public void setInvalidIndexRowCount(long invalidIndexRowCount) { + this.invalidIndexRowCount = invalidIndexRowCount; + } + + public long getBeyondMaxLookBackMissingIndexRowCount() { + return beyondMaxLookBackMissingIndexRowCount; + } + + public void setBeyondMaxLookBackMissingIndexRowCount(long beyondMaxLookBackMissingIndexRowCount) { + this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount; + } + + public long getBeyondMaxLookBackInvalidIndexRowCount() { + return beyondMaxLookBackInvalidIndexRowCount; + } + + public void setBeyondMaxLookBackInvalidIndexRowCount(long beyondMaxLookBackInvalidIndexRowCount) { + this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount; } } - long scannedDataRowCount = 0; - long rebuiltIndexRowCount = 0; - PhaseResult before = new PhaseResult(); - PhaseResult after = new PhaseResult(); + private long scannedDataRowCount = 0; + private long rebuiltIndexRowCount = 0; + private byte[] startRow; + private byte[] stopRow; + private long scanMaxTs; + private byte[] region; + private PhaseResult before = new PhaseResult(); + private PhaseResult after = new PhaseResult(); @Override public String toString() { return "VerificationResult{" + - "scannedDataRowCount=" + scannedDataRowCount + - ", rebuiltIndexRowCount=" + rebuiltIndexRowCount + - ", before=" + before + - ", after=" + after + + "scannedDataRowCount=" + getScannedDataRowCount() + + ", rebuiltIndexRowCount=" + getRebuiltIndexRowCount() + + ", before=" + getBefore() + + ", after=" + getAfter() + '}'; } @@ -132,107 +262,107 @@ public class IndexToolVerificationResult { } public long getBeforeRebuildValidIndexRowCount() { - return before.validIndexRowCount; + return getBefore().getValidIndexRowCount(); } public long getBeforeRebuildExpiredIndexRowCount() { - return before.expiredIndexRowCount; + return getBefore().getExpiredIndexRowCount(); } public long getBeforeRebuildInvalidIndexRowCount() { - return before.invalidIndexRowCount; + return getBefore().getInvalidIndexRowCount(); } public long getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount() { - return before.beyondMaxLookBackMissingIndexRowCount; - }; + return before.getBeyondMaxLookBackMissingIndexRowCount(); + } public long getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount() { - return before.beyondMaxLookBackInvalidIndexRowCount; - }; + return before.getBeyondMaxLookBackInvalidIndexRowCount(); + } public long getBeforeRebuildMissingIndexRowCount() { - return before.missingIndexRowCount; + return getBefore().getMissingIndexRowCount(); } public long getAfterRebuildValidIndexRowCount() { - return after.validIndexRowCount; + return getAfter().getValidIndexRowCount(); } public long getAfterRebuildExpiredIndexRowCount() { - return after.expiredIndexRowCount; + return getAfter().getExpiredIndexRowCount(); } public long getAfterRebuildInvalidIndexRowCount() { - return after.invalidIndexRowCount; + return getAfter().getInvalidIndexRowCount(); } public long getAfterRebuildMissingIndexRowCount() { - return after.missingIndexRowCount; + return getAfter().getMissingIndexRowCount(); } public long getAfterRebuildBeyondMaxLookBackMissingIndexRowCount() { - return after.beyondMaxLookBackMissingIndexRowCount; - }; + return after.getBeyondMaxLookBackMissingIndexRowCount(); + } public long getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount() { - return after.beyondMaxLookBackInvalidIndexRowCount; - }; + return after.getBeyondMaxLookBackInvalidIndexRowCount(); + } private void addScannedDataRowCount(long count) { - this.scannedDataRowCount += count; + this.setScannedDataRowCount(this.getScannedDataRowCount() + count); } private void addRebuiltIndexRowCount(long count) { - this.rebuiltIndexRowCount += count; + this.setRebuiltIndexRowCount(this.getRebuiltIndexRowCount() + count); } private void addBeforeRebuildValidIndexRowCount(long count) { - before.validIndexRowCount += count; + getBefore().setValidIndexRowCount(getBefore().getValidIndexRowCount() + count); } private void addBeforeRebuildExpiredIndexRowCount(long count) { - before.expiredIndexRowCount += count; + getBefore().setExpiredIndexRowCount(getBefore().getExpiredIndexRowCount() + count); } private void addBeforeRebuildMissingIndexRowCount(long count) { - before.missingIndexRowCount += count; + getBefore().setMissingIndexRowCount(getBefore().getMissingIndexRowCount() + count); } private void addBeforeRebuildInvalidIndexRowCount(long count) { - before.invalidIndexRowCount += count; + getBefore().setInvalidIndexRowCount(getBefore().getInvalidIndexRowCount() + count); } private void addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(long count) { - before.beyondMaxLookBackMissingIndexRowCount += count; + before.setBeyondMaxLookBackMissingIndexRowCount(before.getBeyondMaxLookBackMissingIndexRowCount() + count); } private void addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) { - before.beyondMaxLookBackInvalidIndexRowCount += count; + before.setBeyondMaxLookBackInvalidIndexRowCount(before.getBeyondMaxLookBackInvalidIndexRowCount() + count); } private void addAfterRebuildValidIndexRowCount(long count) { - after.validIndexRowCount += count; + getAfter().setValidIndexRowCount(getAfter().getValidIndexRowCount() + count); } private void addAfterRebuildExpiredIndexRowCount(long count) { - after.expiredIndexRowCount += count; + getAfter().setExpiredIndexRowCount(getAfter().getExpiredIndexRowCount() + count); } private void addAfterRebuildMissingIndexRowCount(long count) { - after.missingIndexRowCount += count; + getAfter().setMissingIndexRowCount(getAfter().getMissingIndexRowCount() + count); } private void addAfterRebuildInvalidIndexRowCount(long count) { - after.invalidIndexRowCount += count; + getAfter().setInvalidIndexRowCount(getAfter().getInvalidIndexRowCount() + count); } private void addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(long count) { - after.beyondMaxLookBackMissingIndexRowCount += count; + after.setBeyondMaxLookBackMissingIndexRowCount(after.getBeyondMaxLookBackMissingIndexRowCount() + count); } private void addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) { - after.beyondMaxLookBackInvalidIndexRowCount += count; + after.setBeyondMaxLookBackInvalidIndexRowCount(after.getBeyondMaxLookBackInvalidIndexRowCount() + count); } private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) { @@ -249,7 +379,7 @@ public class IndexToolVerificationResult { cell.getValueOffset(), cell.getValueLength())); } - private void update(Cell cell) { + public void update(Cell cell) { if (CellUtil .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) { addScannedDataRowCount(getValue(cell)); @@ -282,57 +412,17 @@ public class IndexToolVerificationResult { } } - public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { - // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually. - // Search for the place where the trailing 0xFFs start - int offset = rowKeyPrefix.length; - while (offset > 0) { - if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { - break; - } - offset--; - } - if (offset == 0) { - // We got an 0xFFFF... (only FFs) stopRow value which is - // the last possible prefix before the end of the table. - // So set it to stop at the 'end of the table' - return HConstants.EMPTY_END_ROW; - } - // Copy the right length of the original - byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); - // And increment the last one - newStopRow[newStopRow.length - 1]++; - return newStopRow; - } - - public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts) - throws IOException { - IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(); - byte[] startRowKey = Bytes.toBytes(Long.toString(ts)); - byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey); - Scan scan = new Scan(); - scan.setStartRow(startRowKey); - scan.setStopRow(stopRowKey); - ResultScanner scanner = hTable.getScanner(scan); - for (Result result = scanner.next(); result != null; result = scanner.next()) { - for (Cell cell : result.rawCells()) { - verificationResult.update(cell); - } - } - return verificationResult; - } - public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) { if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) { return false; } else if (verifyType == IndexTool.IndexVerifyType.ONLY) { - if (before.invalidIndexRowCount + before.missingIndexRowCount - + before.beyondMaxLookBackInvalidIndexRowCount + before.beyondMaxLookBackMissingIndexRowCount > 0) { + if (getBefore().getInvalidIndexRowCount() + getBefore().getMissingIndexRowCount() + + before.getBeyondMaxLookBackInvalidIndexRowCount() + before.getBeyondMaxLookBackMissingIndexRowCount() > 0) { return true; } } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { - if (after.invalidIndexRowCount + after.missingIndexRowCount - + after.beyondMaxLookBackInvalidIndexRowCount + after.beyondMaxLookBackMissingIndexRowCount > 0) { + if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount() + + after.getBeyondMaxLookBackInvalidIndexRowCount() + after.getBeyondMaxLookBackMissingIndexRowCount() > 0) { return true; } } @@ -340,9 +430,9 @@ public class IndexToolVerificationResult { } public void add(IndexToolVerificationResult verificationResult) { - scannedDataRowCount += verificationResult.scannedDataRowCount; - rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount; - before.add(verificationResult.before); - after.add(verificationResult.after); + setScannedDataRowCount(getScannedDataRowCount() + verificationResult.getScannedDataRowCount()); + setRebuiltIndexRowCount(getRebuiltIndexRowCount() + verificationResult.getRebuiltIndexRowCount()); + getBefore().add(verificationResult.getBefore()); + getAfter().add(verificationResult.getAfter()); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 95703a4..e30da37 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -50,7 +50,6 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; @@ -73,7 +72,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.phoenix.compile.PostIndexDDLCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; @@ -89,7 +87,6 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.ConnectionQueryServices; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; @@ -155,57 +152,6 @@ public class IndexTool extends Configured implements Tool { } } - public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL"; - public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME); - public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; - public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT"; - public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME); - public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; - public final static String DATA_TABLE_NAME = "DTName"; - public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME); - public static String INDEX_TABLE_NAME = "ITName"; - public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME); - public static String DATA_TABLE_ROW_KEY = "DTRowKey"; - public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY); - public static String INDEX_TABLE_ROW_KEY = "ITRowKey"; - public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY); - public static String DATA_TABLE_TS = "DTTS"; - public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS); - public static String INDEX_TABLE_TS = "ITTS"; - public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS); - public static String ERROR_MESSAGE = "Error"; - public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE); - public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount"; - public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT); - public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount"; - public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount"; - public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount"; - public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount"; - public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount"; - public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount"; - public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount"; - public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount"; - public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount"; - public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount"; - public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount"; - public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackMissingIndexRowCount"; - public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount"; - public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT); - public static String VERIFICATION_PHASE = "Phase"; - public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE); - private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class); private String schemaName; @@ -703,23 +649,10 @@ public class IndexTool extends Configured implements Tool { } private void createIndexToolTables(Connection connection) throws Exception { - ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices(); - Admin admin = queryServices.getAdmin(); - if (!admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) { - HTableDescriptor tableDescriptor = new - HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME)); - tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); - HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY); - tableDescriptor.addFamily(columnDescriptor); - admin.createTable(tableDescriptor); - } - if (!admin.tableExists(TableName.valueOf(RESULT_TABLE_NAME))) { - HTableDescriptor tableDescriptor = new - HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME)); - tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); - HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY); - tableDescriptor.addFamily(columnDescriptor); - admin.createTable(tableDescriptor); + try (IndexVerificationResultRepository resultRepo = new IndexVerificationResultRepository(); + IndexVerificationOutputRepository outputRepo = new IndexVerificationOutputRepository()){ + resultRepo.createResultTable(connection); + outputRepo.createOutputTable(connection); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java new file mode 100644 index 0000000..bcc2e73 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.ByteUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class IndexVerificationOutputRepository implements AutoCloseable { + public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|"); + + private Table indexTable; + private byte[] indexName; + private Table outputTable; + public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL"; + public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME); + public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; + + public final static String DATA_TABLE_NAME = "DTName"; + public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME); + public final static String INDEX_TABLE_NAME = "ITName"; + public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME); + public final static String DATA_TABLE_ROW_KEY = "DTRowKey"; + public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY); + public final static String INDEX_TABLE_ROW_KEY = "ITRowKey"; + public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY); + public final static String DATA_TABLE_TS = "DTTS"; + public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS); + public final static String INDEX_TABLE_TS = "ITTS"; + public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS); + public final static String ERROR_MESSAGE = "Error"; + public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE); + + public static String VERIFICATION_PHASE = "Phase"; + public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE); + public final static String EXPECTED_VALUE = "ExpectedValue"; + public final static byte[] EXPECTED_VALUE_BYTES = Bytes.toBytes(EXPECTED_VALUE); + public final static String ACTUAL_VALUE = "ActualValue"; + public final static byte[] ACTUAL_VALUE_BYTES = Bytes.toBytes(ACTUAL_VALUE); + public static final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:"); + public static final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:"); + public static final int PREFIX_LENGTH = 3; + public static final int TOTAL_PREFIX_LENGTH = 6; + public static final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE"); + public static final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER"); + + /** + * Only usable for the create table / read path or for testing. Use setOutputTable and + * setIndexTable first to write. + */ + public IndexVerificationOutputRepository() { + + } + + @VisibleForTesting + public IndexVerificationOutputRepository(byte[] indexName, Connection conn) throws SQLException { + ConnectionQueryServices queryServices = + conn.unwrap(PhoenixConnection.class).getQueryServices(); + outputTable = queryServices.getTable(OUTPUT_TABLE_NAME_BYTES); + indexTable = queryServices.getTable(indexName); + } + + public IndexVerificationOutputRepository(byte[] indexName, HTableFactory hTableFactory) throws IOException { + this.indexName = indexName; + outputTable = hTableFactory.getTable(new ImmutableBytesPtr(OUTPUT_TABLE_NAME_BYTES)); + indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName)); + } + + public static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) { + byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); + byte[] rowKey; + int targetOffset = 0; + // The row key for the output table : timestamp | index table name | data row key + rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + + ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length]; + Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); + targetOffset += keyPrefix.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); + targetOffset += indexTableName.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length); + return rowKey; + } + + /** + * Generates partial row key for use in a Scan to get all rows for an index verification + */ + private static byte[] generatePartialOutputTableRowKey(long ts, byte[] indexTableName){ + byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); + byte[] partialRowKey; + int targetOffset = 0; + // The row key for the output table : timestamp | index table name | data row key + partialRowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length]; + Bytes.putBytes(partialRowKey, targetOffset, keyPrefix, 0, keyPrefix.length); + targetOffset += keyPrefix.length; + Bytes.putBytes(partialRowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(partialRowKey, targetOffset, indexTableName, 0, indexTableName.length); + return partialRowKey; + } + + public void createOutputTable(Connection connection) throws IOException, SQLException { + ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + TableName outputTableName = TableName.valueOf(OUTPUT_TABLE_NAME); + if (!admin.tableExists(outputTableName)) { + HTableDescriptor tableDescriptor = new + HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME)); + tableDescriptor.setValue(HColumnDescriptor.TTL, + String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); + HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY); + tableDescriptor.addFamily(columnDescriptor); + admin.createTable(tableDescriptor); + outputTable = admin.getConnection().getTable(outputTableName); + } + } + + @VisibleForTesting + public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, + long indexRowTs, + String errorMsg, byte[] expectedValue, byte[] actualValue, + long scanMaxTs, byte[] tableName, boolean isBeforeRebuild) + throws IOException { + byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey); + Put put = new Put(rowKey); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, + scanMaxTs, tableName); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES, + scanMaxTs, indexName); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs))); + + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES, + scanMaxTs, indexRowKey); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs))); + byte[] errorMessageBytes; + if (expectedValue != null) { + errorMessageBytes = getErrorMessageBytes(errorMsg, expectedValue, actualValue); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES, expectedValue); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES, actualValue); + } else { + errorMessageBytes = Bytes.toBytes(errorMsg); + } + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes); + if (isBeforeRebuild) { + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE); + } else { + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE); + } + outputTable.put(put); + } + + public static byte[] getErrorMessageBytes(String errorMsg, byte[] expectedValue, byte[] actualValue) { + byte[] errorMessageBytes; + errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length + + TOTAL_PREFIX_LENGTH]; + Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length()); + int length = errorMsg.length(); + Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); + length += PREFIX_LENGTH; + Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length); + length += expectedValue.length; + Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); + length += PREFIX_LENGTH; + Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length); + return errorMessageBytes; + } + + public List<IndexVerificationOutputRow> getOutputRows(long ts, byte[] indexName) + throws IOException { + Iterator<IndexVerificationOutputRow> iter = getOutputRowIterator(ts, indexName); + List<IndexVerificationOutputRow> outputRowList = new ArrayList<IndexVerificationOutputRow>(); + while (iter.hasNext()){ + outputRowList.add(iter.next()); + } + return outputRowList; + } + + public Iterator<IndexVerificationOutputRow> getOutputRowIterator(long ts, byte[] indexName) + throws IOException { + Scan scan = new Scan(); + byte[] partialKey = generatePartialOutputTableRowKey(ts, indexName); + scan.withStartRow(partialKey); + scan.withStopRow(ByteUtil.calculateTheClosestNextRowKeyForPrefix(partialKey)); + ResultScanner scanner = outputTable.getScanner(scan); + return new IndexVerificationOutputRowIterator(scanner.iterator()); + } + + public static IndexVerificationOutputRow getOutputRowFromResult(Result result) { + IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder = + new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder(); + byte[] rowKey = result.getRow(); + //rowkey is scanTs + SEPARATOR_BYTE + indexTableName + SEPARATOR_BYTE + dataTableRowKey + byte[][] rowKeySplit = ByteUtil.splitArrayBySeparator(rowKey, ROW_KEY_SEPARATOR_BYTE[0]); + builder.setScanMaxTimestamp(Long.parseLong(Bytes.toString(rowKeySplit[0]))); + builder.setIndexTableName(Bytes.toString(rowKeySplit[1])); + builder.setDataTableRowKey(rowKeySplit[2]); + + builder.setDataTableName(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + DATA_TABLE_NAME_BYTES))); + builder.setIndexTableRowKey(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + INDEX_TABLE_ROW_KEY_BYTES)); + builder.setDataTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + DATA_TABLE_TS_BYTES)))); + builder.setIndexTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + INDEX_TABLE_TS_BYTES)))); + builder.setErrorMessage(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + ERROR_MESSAGE_BYTES))); + //actual and expected value might not be present, but will just set to null if not + builder.setExpectedValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES)); + builder.setActualValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES)); + builder.setPhaseValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES)); + return builder.build(); + } + + public void close() throws IOException { + if (outputTable != null) { + outputTable.close(); + } + if (indexTable != null) { + indexTable.close(); + } + } + + public class IndexVerificationOutputRowIterator implements Iterator<IndexVerificationOutputRow> { + Iterator<Result> delegate; + public IndexVerificationOutputRowIterator(Iterator<Result> delegate){ + this.delegate = delegate; + } + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public IndexVerificationOutputRow next() { + Result result = delegate.next(); + if (result == null) { + return null; + } else { + return getOutputRowFromResult(result); + } + } + + @Override + public void remove() { + delegate.remove(); + } + + } + + public void setIndexTable(Table indexTable) { + this.indexTable = indexTable; + } + + public void setOutputTable(Table outputTable) { + this.outputTable = outputTable; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java new file mode 100644 index 0000000..8c54796 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Arrays; +import java.util.Objects; + +public class IndexVerificationOutputRow { + public static final String SCAN_MAX_TIMESTAMP = "ScanMaxTimestamp: "; + private String dataTableName; + private String indexTableName; + private Long scanMaxTimestamp; + private byte[] dataTableRowKey; + private byte[] indexTableRowKey; + private Long dataTableRowTimestamp; + private Long indexTableRowTimestamp; + private String errorMessage; + private byte[] expectedValue; + private byte[] actualValue; + private byte[] phaseValue; + + private IndexVerificationOutputRow(String dataTableName, String indexTableName, + byte[] dataTableRowKey, Long scanMaxTimestamp, + byte[] indexTableRowKey, + long dataTableRowTimestamp, long indexTableRowTimestamp, + String errorMessage, byte[] expectedValue, byte[] actualValue, + byte[] phaseValue) { + this.dataTableName = dataTableName; + this.indexTableName = indexTableName; + this.scanMaxTimestamp = scanMaxTimestamp; + this.dataTableRowKey = dataTableRowKey; + this.indexTableRowKey = indexTableRowKey; + this.dataTableRowTimestamp = dataTableRowTimestamp; + this.indexTableRowTimestamp = indexTableRowTimestamp; + this.errorMessage = errorMessage; + this.expectedValue = expectedValue; + this.actualValue = actualValue; + this.phaseValue = phaseValue; + } + + public String getDataTableName() { + return dataTableName; + } + + public String getIndexTableName() { + return indexTableName; + } + + public Long getScanMaxTimestamp() { + return scanMaxTimestamp; + } + + public byte[] getIndexTableRowKey() { + return indexTableRowKey; + } + + public long getIndexTableRowTimestamp() { + return indexTableRowTimestamp; + } + + public String getErrorMessage() { + return errorMessage; + } + + public byte[] getExpectedValue() { + return expectedValue; + } + + public byte[] getActualValue() { + return actualValue; + } + + public byte[] getPhaseValue() { + return phaseValue; + } + + public byte[] getDataTableRowKey() { + return dataTableRowKey; + } + + public Long getDataTableRowTimestamp() { + return dataTableRowTimestamp; + } + + @Override + public boolean equals(Object o) { + if (o == null ) { + return false; + } + if (!(o instanceof IndexVerificationOutputRow)) { + return false; + } + IndexVerificationOutputRow otherRow = (IndexVerificationOutputRow) o; + + return Objects.equals(dataTableName, otherRow.getDataTableName()) && + Objects.equals(indexTableName, otherRow.getIndexTableName()) && + Objects.equals(scanMaxTimestamp, otherRow.getScanMaxTimestamp()) && + Arrays.equals(dataTableRowKey, otherRow.getDataTableRowKey()) && + Arrays.equals(indexTableRowKey, otherRow.getIndexTableRowKey()) && + Objects.equals(dataTableRowTimestamp, otherRow.getDataTableRowTimestamp()) && + Objects.equals(indexTableRowTimestamp, otherRow.getIndexTableRowTimestamp()) && + Objects.equals(errorMessage, otherRow.getErrorMessage()) && + Arrays.equals(expectedValue, otherRow.getExpectedValue()) && + Arrays.equals(actualValue, otherRow.getActualValue()) && + Arrays.equals(phaseValue, otherRow.getPhaseValue()); + } + + @Override + public int hashCode(){ + return Objects.hashCode(scanMaxTimestamp) ^ Objects.hashCode(indexTableName) ^ + Arrays.hashCode(dataTableRowKey); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(IndexVerificationOutputRepository.DATA_TABLE_NAME + ": ").append(dataTableName).append(","); + sb.append(IndexVerificationOutputRepository.INDEX_TABLE_NAME + ": ").append(indexTableName).append(","); + sb.append(SCAN_MAX_TIMESTAMP).append(": ").append(scanMaxTimestamp).append(","); + sb.append(IndexVerificationOutputRepository.DATA_TABLE_ROW_KEY + ": ").append(Bytes.toString(dataTableRowKey)).append(","); + sb.append(IndexVerificationOutputRepository.INDEX_TABLE_ROW_KEY + ": ").append(Bytes.toString(indexTableRowKey)).append(","); + sb.append(IndexVerificationOutputRepository.DATA_TABLE_TS + ": ").append(dataTableRowTimestamp).append(","); + sb.append(IndexVerificationOutputRepository.INDEX_TABLE_TS + ": ").append(indexTableRowTimestamp).append(","); + sb.append(IndexVerificationOutputRepository.ERROR_MESSAGE + ": ").append(errorMessage).append(","); + sb.append(IndexVerificationOutputRepository.EXPECTED_VALUE + ": ").append(Bytes.toString(expectedValue)).append(","); + sb.append(IndexVerificationOutputRepository.ACTUAL_VALUE + ": ").append(Bytes.toString(actualValue)).append( + ","); + sb.append(IndexVerificationOutputRepository.VERIFICATION_PHASE + ": ").append(Bytes.toString(phaseValue)); + return sb.toString(); + } + + public static class IndexVerificationOutputRowBuilder { + private String dataTableName; + private String indexTableName; + private Long scanMaxTimestamp; + private byte[] dataTableRowKey; + private byte[] indexTableRowKey; + private long dataTableRowTimestamp; + private long indexTableRowTimestamp; + private String errorMessage; + private byte[] expectedValue; + private byte[] actualValue; + private byte[] phaseValue; + + public IndexVerificationOutputRowBuilder setDataTableName(String dataTableName) { + this.dataTableName = dataTableName; + return this; + } + + public IndexVerificationOutputRowBuilder setIndexTableName(String indexTableName) { + this.indexTableName = indexTableName; + return this; + } + + public IndexVerificationOutputRowBuilder setScanMaxTimestamp(Long scanMaxTimestamp) { + this.scanMaxTimestamp = scanMaxTimestamp; + return this; + } + + public IndexVerificationOutputRowBuilder setIndexTableRowKey(byte[] indexTableRowKey) { + this.indexTableRowKey = indexTableRowKey; + return this; + } + + public IndexVerificationOutputRowBuilder setDataTableRowKey(byte[] dataTableRowKey){ + this.dataTableRowKey = dataTableRowKey; + return this; + } + + public IndexVerificationOutputRowBuilder setDataTableRowTimestamp(long dataTableRowTimestamp) { + this.dataTableRowTimestamp = dataTableRowTimestamp; + return this; + } + + public IndexVerificationOutputRowBuilder setIndexTableRowTimestamp(long indexTableRowTimestamp) { + this.indexTableRowTimestamp = indexTableRowTimestamp; + return this; + } + + public IndexVerificationOutputRowBuilder setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } + + public IndexVerificationOutputRowBuilder setExpectedValue(byte[] expectedValue) { + this.expectedValue = expectedValue; + return this; + } + + public IndexVerificationOutputRowBuilder setActualValue(byte[] actualValue) { + this.actualValue = actualValue; + return this; + } + + public IndexVerificationOutputRowBuilder setPhaseValue(byte[] phaseValue) { + this.phaseValue = phaseValue; + return this; + } + + public IndexVerificationOutputRow build() { + return new IndexVerificationOutputRow(dataTableName, indexTableName, dataTableRowKey, + scanMaxTimestamp, indexTableRowKey, dataTableRowTimestamp, indexTableRowTimestamp, + errorMessage, expectedValue, actualValue, phaseValue); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java new file mode 100644 index 0000000..ca8b129 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.IndexToolVerificationResult; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.ByteUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; + +public class IndexVerificationResultRepository implements AutoCloseable { + + private Table resultTable; + private Table indexTable; + public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|"); + public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT"; + public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME); + public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; + public final static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount"; + public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT); + public final static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount"; + public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT); + public final static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = + "BeforeRebuildValidIndexRowCount"; + public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT); + public final static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = + "BeforeRebuildExpiredIndexRowCount"; + public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT); + public final static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = + "BeforeRebuildMissingIndexRowCount"; + public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT); + public final static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = + "BeforeRebuildInvalidIndexRowCount"; + public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT); + public final static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = + "AfterValidExpiredIndexRowCount"; + public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT); + public final static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = + "AfterRebuildExpiredIndexRowCount"; + public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT); + public final static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = + "AfterRebuildMissingIndexRowCount"; + public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT); + public final static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = + "AfterRebuildInvalidIndexRowCount"; + public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT); + public final static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = + "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount"; + public final static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT); + public final static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = + "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount"; + public final static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT); + + public final static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = + "AfterRebuildBeyondMaxLookBackMissingIndexRowCount"; + public final static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT); + public final static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = + "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount"; + public final static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT); + + /*** + * Only usable for read / create methods. To write use setResultTable and setIndexTable first + */ + public IndexVerificationResultRepository(){ + + } + + public IndexVerificationResultRepository(Connection conn, byte[] indexNameBytes) throws SQLException { + resultTable = getTable(conn, RESULT_TABLE_NAME_BYTES); + indexTable = getTable(conn, indexNameBytes); + } + + public IndexVerificationResultRepository(byte[] indexName, + HTableFactory hTableFactory) throws IOException { + resultTable = hTableFactory.getTable(new ImmutableBytesPtr(RESULT_TABLE_NAME_BYTES)); + indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName)); + } + + public void createResultTable(Connection connection) throws IOException, SQLException { + ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + TableName resultTableName = TableName.valueOf(RESULT_TABLE_NAME); + if (!admin.tableExists(resultTableName)) { + HTableDescriptor tableDescriptor = new + HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME)); + tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); + HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY); + tableDescriptor.addFamily(columnDescriptor); + admin.createTable(tableDescriptor); + setResultTable(admin.getConnection().getTable(resultTableName)); + } + } + public static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName, + byte[] startRow, byte[] stopRow) { + byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); + int targetOffset = 0; + // The row key for the result table : timestamp | index table name | datable table region name | + // scan start row | scan stop row + byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + + ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length + + startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length]; + Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); + targetOffset += keyPrefix.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); + targetOffset += indexTableName.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length); + targetOffset += regionName.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length); + targetOffset += startRow.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length); + return rowKey; + } + + public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult, + IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException { + long scanMaxTs = verificationResult.getScanMaxTs(); + byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexTable.getName().toBytes(), + region, verificationResult.getStartRow(), + verificationResult.getStopRow()); + Put put = new Put(rowKey); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount()))); + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH || + verifyType == IndexTool.IndexVerifyType.ONLY) { + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, + Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackMissingIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, + Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackInvalidIndexRowCount()))); + } + if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, + Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackMissingIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, + Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackInvalidIndexRowCount()))); + } + resultTable.put(put); + } + + public IndexToolVerificationResult getVerificationResult(Connection conn, long ts) throws IOException, SQLException { + Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES); + return getVerificationResult(hTable, ts); + } + + public Table getTable(Connection conn, byte[] tableName) throws SQLException { + return conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(tableName); + } + + public IndexToolVerificationResult getVerificationResult(Table htable, long ts) + throws IOException { + byte[] startRowKey = Bytes.toBytes(Long.toString(ts)); + byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(startRowKey); + IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(ts); + Scan scan = new Scan(); + scan.withStartRow(startRowKey); + scan.withStopRow(stopRowKey); + ResultScanner scanner = htable.getScanner(scan); + for (Result result = scanner.next(); result != null; result = scanner.next()) { + boolean isFirst = true; + for (Cell cell : result.rawCells()) { + if (isFirst){ + byte[][] rowKeyParts = ByteUtil.splitArrayBySeparator(result.getRow(), + ROW_KEY_SEPARATOR_BYTE[0]); + verificationResult.setStartRow(rowKeyParts[3]); + verificationResult.setStopRow(rowKeyParts[4]); + isFirst = false; + } + verificationResult.update(cell); + } + } + return verificationResult; + } + + public void close() throws IOException { + if (resultTable != null) { + resultTable.close(); + } + if (indexTable != null) { + indexTable.close(); + } + } + + public void setResultTable(Table resultTable) { + this.resultTable = resultTable; + } + + public void setIndexTable(Table indexTable) { + this.indexTable = indexTable; + } +} + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java index a24e3ab..4cd2603 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java @@ -25,12 +25,10 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.coprocessor.IndexToolVerificationResult; import org.apache.phoenix.coprocessor.TaskRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -53,6 +51,7 @@ import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getInde public class PhoenixIndexImportDirectReducer extends Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> { private AtomicBoolean calledOnce = new AtomicBoolean(false); + private IndexVerificationResultRepository resultRepository; private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class); @@ -62,10 +61,8 @@ public class PhoenixIndexImportDirectReducer extends Configuration configuration = context.getConfiguration(); try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) { long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); - Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(IndexTool.RESULT_TABLE_NAME_BYTES); IndexToolVerificationResult verificationResult = - IndexToolVerificationResult.getVerificationResult(hTable, ts); + resultRepository.getVerificationResult(connection, ts); context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT). setValue(verificationResult.getScannedDataRowCount()); context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT). @@ -108,6 +105,11 @@ public class PhoenixIndexImportDirectReducer extends } @Override + protected void setup(Context context) throws IOException { + resultRepository = new IndexVerificationResultRepository(); + } + + @Override protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1, Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException @@ -133,6 +135,7 @@ public class PhoenixIndexImportDirectReducer extends protected void cleanup(Context context) throws IOException, InterruptedException{ try { updateTasksTable(context); + resultRepository.close(); } catch (SQLException e) { LOGGER.error(" Failed to update the tasks table"); throw new RuntimeException(e.getMessage()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java index 5a2b624..cc582f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java @@ -23,12 +23,14 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -606,4 +608,48 @@ public class ByteUtil { } return true; } + + public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { + // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually. + // Search for the place where the trailing 0xFFs start + int offset = rowKeyPrefix.length; + while (offset > 0) { + if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { + break; + } + offset--; + } + if (offset == 0) { + // We got an 0xFFFF... (only FFs) stopRow value which is + // the last possible prefix before the end of the table. + // So set it to stop at the 'end of the table' + return HConstants.EMPTY_END_ROW; + } + // Copy the right length of the original + byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); + // And increment the last one + newStopRow[newStopRow.length - 1]++; + return newStopRow; + } + + public static byte[][] splitArrayBySeparator(byte[] src, byte separator){ + List<Integer> separatorLocations = new ArrayList<Integer>(); + for (int k = 0; k < src.length; k++){ + if (src[k] == separator){ + separatorLocations.add(k); + } + } + byte[][] dst = new byte[separatorLocations.size() +1][]; + int previousSepartor = -1; + for (int j = 0; j < separatorLocations.size(); j++){ + int separatorLocation = separatorLocations.get(j); + dst[j] = Bytes.copy(src, previousSepartor +1, separatorLocation- previousSepartor -1); + previousSepartor = separatorLocation; + } + if (previousSepartor < src.length){ + dst[separatorLocations.size()] = Bytes.copy(src, + previousSepartor +1, src.length - previousSepartor -1); + } + return dst; + } }