This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 5a9ec55 PHOENIX-5890: Port Phoenix-5799 to master (#787) 5a9ec55 is described below commit 5a9ec5552fb63fdf640e0c9adbb81eb6eec0ccae Author: Swaroopa Kadam <swaroopa.kada...@gmail.com> AuthorDate: Tue May 26 16:59:54 2020 -0700 PHOENIX-5890: Port Phoenix-5799 to master (#787) --- .../org/apache/phoenix/end2end/IndexToolIT.java | 32 +- .../index/IndexVerificationOutputRepositoryIT.java | 172 +++++++++++ .../index/IndexVerificationResultRepositoryIT.java | 131 ++++++++ .../coprocessor/IndexRebuildRegionScanner.java | 222 +++----------- .../coprocessor/IndexToolVerificationResult.java | 331 ++++++++++++--------- .../apache/phoenix/mapreduce/index/IndexTool.java | 76 +---- .../index/IndexVerificationOutputRepository.java | 288 ++++++++++++++++++ .../index/IndexVerificationOutputRow.java | 221 ++++++++++++++ .../index/IndexVerificationResultRepository.java | 221 ++++++++++++++ .../index/PhoenixIndexImportDirectReducer.java | 12 +- .../java/org/apache/phoenix/util/ByteUtil.java | 46 +++ 11 files changed, 1352 insertions(+), 400 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 6813b2c..5946238 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper; @@ -374,10 +376,11 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { private void dropIndexToolTables(Connection conn) throws Exception { Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES); + TableName indexToolOutputTable = + TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); admin.disableTable(indexToolOutputTable); admin.deleteTable(indexToolOutputTable); - TableName indexToolResultTable = TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES); + TableName indexToolResultTable = TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES); admin.disableTable(indexToolResultTable); admin.deleteTable(indexToolResultTable); } @@ -487,12 +490,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { // This method verifies the common prefix, i.e., "timestamp | index table name | ", since the rest of the // fields may include the separator key - int offset = Bytes.indexOf(rowKey, IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE); + int offset = Bytes.indexOf(rowKey, IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE); offset++; byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); assertEquals(Bytes.compareTo(rowKey, offset, indexTableFullNameBytes.length, indexTableFullNameBytes, 0, indexTableFullNameBytes.length), 0); - assertEquals(rowKey[offset + indexTableFullNameBytes.length], IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE[0]); + assertEquals(rowKey[offset + indexTableFullNameBytes.length], + IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]); } private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName) @@ -500,7 +504,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName); Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(IndexTool.OUTPUT_TABLE_NAME_BYTES); + .getTable(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); Scan scan = new Scan(); ResultScanner scanner = hIndexTable.getScanner(scan); boolean dataTableNameCheck = false; @@ -509,28 +513,30 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { for (Result result = scanner.next(); result != null; result = scanner.next()) { for (Cell cell : result.rawCells()) { assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0, - IndexTool.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0); + IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY, 0, + IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0); if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - IndexTool.DATA_TABLE_NAME_BYTES, 0, IndexTool.DATA_TABLE_NAME_BYTES.length) == 0) { + IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES.length) == 0) { dataTableNameCheck = true; assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0); } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - IndexTool.INDEX_TABLE_NAME_BYTES, 0, IndexTool.INDEX_TABLE_NAME_BYTES.length) == 0) { + IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES.length) == 0) { indexTableNameCheck = true; assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0); } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - IndexTool.ERROR_MESSAGE_BYTES, 0, IndexTool.ERROR_MESSAGE_BYTES.length) == 0) { + IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES, 0, IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES.length) == 0) { errorMessageCell = cell; } } } - assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null); + assertTrue( "DataTableNameCheck was false", dataTableNameCheck); + assertTrue("IndexTableNameCheck was false", indexTableNameCheck); + assertTrue("Error message cell was null", errorMessageCell != null); verifyIndexTableRowKey(CellUtil.cloneRow(errorMessageCell), indexTableFullName); hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(IndexTool.RESULT_TABLE_NAME_BYTES); + .getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES); scan = new Scan(); scanner = hIndexTable.getScanner(scan); Result result = scanner.next(); @@ -709,7 +715,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { Thread.sleep(1000); future.get(40, TimeUnit.SECONDS); - TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES); + TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); admin.disableTable(indexToolOutputTable); admin.deleteTable(indexToolOutputTable); // Run the index tool using the only-verify option, verify it gives no mismatch diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java new file mode 100644 index 0000000..0b67044 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT { + + @BeforeClass + public static void setupClass() throws Exception { + Map<String, String> props = Collections.emptyMap(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testReadIndexVerificationOutputRow() throws Exception { + String expectedErrorMessage = "I am an error message"; + byte[] expectedValue = Bytes.toBytes("ab"); + byte[] actualValue = Bytes.toBytes("ac"); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = "T" + generateUniqueName(); + byte[] tableNameBytes = Bytes.toBytes(tableName); + String indexName = "I" + generateUniqueName(); + createTableAndIndexes(conn, tableName, indexName); + byte[] indexNameBytes = Bytes.toBytes(indexName); + IndexVerificationOutputRepository outputRepository = + new IndexVerificationOutputRepository(indexNameBytes, conn); + outputRepository.createOutputTable(conn); + populateTable(conn, tableName); + byte[] dataRowKey = getRowKey(conn, tableNameBytes); + byte[] indexRowKey = getRowKey(conn, indexNameBytes); + long dataRowTs = getTimestamp(conn, tableNameBytes); + long indexRowTs = getTimestamp(conn, indexNameBytes); + long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis(); + outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs, tableNameBytes, true); + //now increment the scan time by 1 and do it again + outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs +1, tableNameBytes, false); + //make sure we only get the first row back + IndexVerificationOutputRow expectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE); + verifyOutputRow(outputRepository, scanMaxTs, indexNameBytes, expectedRow); + //make sure we get the second row back + IndexVerificationOutputRow secondExpectedRow = buildVerificationRow(dataRowKey, + indexRowKey, dataRowTs, + indexRowTs, expectedErrorMessage, expectedValue, actualValue, + scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE); + verifyOutputRow(outputRepository, scanMaxTs+1, indexNameBytes, secondExpectedRow); + } + + } + + public void verifyOutputRow(IndexVerificationOutputRepository outputRepository, long scanMaxTs, + byte[] indexNameBytes, IndexVerificationOutputRow expectedRow) + throws IOException { + List<IndexVerificationOutputRow> actualRows = + outputRepository.getOutputRows(scanMaxTs, indexNameBytes); + assertNotNull(actualRows); + assertEquals(1, actualRows.size()); + assertEquals(expectedRow, actualRows.get(0)); + } + + private IndexVerificationOutputRow buildVerificationRow(byte[] dataRowKey, byte[] indexRowKey, + long dataRowTs, long indexRowTs, + String expectedErrorMessage, + byte[] expectedValue, byte[] actualValue, + long scanMaxTs, + byte[] tableNameBytes, + byte[] indexNameBytes, + byte[] phaseBeforeValue) { + IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder = + new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder(); + return builder.setDataTableRowKey(dataRowKey). + setIndexTableRowKey(indexRowKey). + setScanMaxTimestamp(dataRowTs). + setDataTableRowTimestamp(dataRowTs). + setIndexTableRowTimestamp(indexRowTs). + setErrorMessage(Bytes.toString( + IndexVerificationOutputRepository. + getErrorMessageBytes(expectedErrorMessage, expectedValue, actualValue))). + setExpectedValue(expectedValue). + setActualValue(actualValue). + setScanMaxTimestamp(scanMaxTs). + setDataTableName(Bytes.toString(tableNameBytes)). + setIndexTableName(Bytes.toString(indexNameBytes)). + setPhaseValue(phaseBeforeValue). + build(); + } + + private byte[] getRowKey(Connection conn, byte[] tableNameBytes) + throws SQLException, IOException { + Scan scan = new Scan(); + Table table = + conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes); + ResultScanner scanner = table.getScanner(scan); + Result r = scanner.next(); + return r.getRow(); + } + + private long getTimestamp(Connection conn, byte[] tableNameBytes) throws SQLException, + IOException { + Scan scan = new Scan(); + Table table = + conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes); + ResultScanner scanner = table.getScanner(scan); + Result r = scanner.next(); + return r.listCells().get(0).getTimestamp(); + } + + private void createTable(Connection conn, String tableName) throws Exception { + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " + + "val3 varchar(10))"); + } + + private void populateTable(Connection conn, String tableName) throws Exception { + conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + } + + private void createTableAndIndexes(Connection conn, String dataTableName, + String indexTableName) throws Exception { + createTable(conn, dataTableName); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)"); + conn.commit(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java new file mode 100644 index 0000000..a83b085 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.IndexToolVerificationResult; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class IndexVerificationResultRepositoryIT extends ParallelStatsDisabledIT { + + @BeforeClass + public static void setupClass() throws Exception { + Map<String, String> props = Collections.emptyMap(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testReadResultRow() throws Exception { + String tableName = "T" + generateUniqueName(); + String indexName = "I" + generateUniqueName(); + byte[] indexNameBytes = Bytes.toBytes(indexName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + createTableAndIndex(conn, tableName, indexName); + IndexVerificationResultRepository resultRepository = + new IndexVerificationResultRepository(conn, indexNameBytes); + resultRepository.createResultTable(conn); + byte[] regionOne = Bytes.toBytes("a.1.00000000000000000000"); + byte[] regionTwo = Bytes.toBytes("a.2.00000000000000000000"); + long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis(); + IndexToolVerificationResult expectedResult = getExpectedResult(scanMaxTs); + resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH, + regionOne); + resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH, + regionTwo); + IndexToolVerificationResult actualResult = + resultRepository.getVerificationResult(conn, scanMaxTs); + assertVerificationResult(expectedResult, actualResult); + + } + } + + private void assertVerificationResult(IndexToolVerificationResult expectedResult, IndexToolVerificationResult actualResult) { + assertEquals(expectedResult.getScanMaxTs(), actualResult.getScanMaxTs()); + assertArrayEquals(expectedResult.getStartRow(), actualResult.getStartRow()); + assertArrayEquals(actualResult.getStopRow(), actualResult.getStopRow()); + + //because we're combining two near-identical rows (same values, different region) + //we assert on 2x the expected value + assertEquals(2 * expectedResult.getBeforeRebuildExpiredIndexRowCount(), + actualResult.getBeforeRebuildExpiredIndexRowCount()); + assertEquals(2 * expectedResult.getBeforeRebuildInvalidIndexRowCount(), + actualResult.getBeforeRebuildInvalidIndexRowCount()); + assertEquals(2 * expectedResult.getBeforeRebuildMissingIndexRowCount(), + actualResult.getBeforeRebuildMissingIndexRowCount()); + assertEquals(2 * expectedResult.getBeforeRebuildValidIndexRowCount(), + actualResult.getBeforeRebuildValidIndexRowCount()); + + assertEquals(2 * expectedResult.getAfterRebuildExpiredIndexRowCount(), + actualResult.getAfterRebuildExpiredIndexRowCount()); + assertEquals(2 * expectedResult.getAfterRebuildInvalidIndexRowCount(), + actualResult.getAfterRebuildInvalidIndexRowCount()); + assertEquals(2 * expectedResult.getAfterRebuildMissingIndexRowCount(), + actualResult.getAfterRebuildMissingIndexRowCount()); + assertEquals(2 * expectedResult.getAfterRebuildValidIndexRowCount(), + actualResult.getAfterRebuildValidIndexRowCount()); + + assertEquals(2 * expectedResult.getScannedDataRowCount(), + actualResult.getScannedDataRowCount()); + assertEquals(2 * expectedResult.getRebuiltIndexRowCount(), + actualResult.getRebuiltIndexRowCount()); + + } + + private IndexToolVerificationResult getExpectedResult(long scanMaxTs) { + byte[] startRow = Bytes.toBytes("a"); + byte[] stopRow = Bytes.toBytes("b"); + IndexToolVerificationResult result = new IndexToolVerificationResult(startRow, stopRow, + scanMaxTs); + IndexToolVerificationResult.PhaseResult before = + new IndexToolVerificationResult.PhaseResult(); + + IndexToolVerificationResult.PhaseResult after = + new IndexToolVerificationResult.PhaseResult(); + result.setBefore(before); + result.setAfter(after); + return result; + } + + private void createTable(Connection conn, String tableName) throws Exception { + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " + + "val3 varchar(10))"); + } + + private void createTableAndIndex(Connection conn, String dataTableName, + String indexTableName) throws Exception { + createTable(conn, dataTableName); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)"); + conn.commit(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 11da487..a011b5e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -20,21 +20,6 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn; import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY; -import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES; import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; @@ -50,6 +35,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.NavigableSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -63,8 +49,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -90,13 +78,18 @@ import org.apache.phoenix.hbase.index.parallel.TaskRunner; import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; +import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.types.PLong; @@ -120,7 +113,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { public static final String NO_EXPECTED_MUTATION = "No expected mutation"; public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty"; - public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|"); private long pageSizeInRows = Long.MAX_VALUE; private int rowCountPerTask; private boolean hasMore; @@ -155,6 +147,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private int singleRowRebuildReturnCode; private Map<byte[], NavigableSet<byte[]>> familyMap; private byte[][] viewConstants; + protected HTableFactory hTableFactory; + private IndexVerificationResultRepository verificationResultRepository; + private IndexVerificationOutputRepository verificationOutputRepository; @VisibleForTesting public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, @@ -198,19 +193,21 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); if (valueBytes != null) { - verificationResult = new IndexToolVerificationResult(); + verificationResult = new IndexToolVerificationResult(scan); verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes); if (verifyType != IndexTool.IndexVerifyType.NONE) { verify = true; viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); - // Create the following objects only for rebuilds by IndexTool + + hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env); indexHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION, env).getTable(TableName.valueOf(indexMaintainer.getIndexTableName())); indexTableTTL = indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive(); - outputHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION, - env).getTable(TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES)); - resultHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION, - env).getTable(TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES)); + // Create the following objects only for rebuilds by IndexTool + verificationResultRepository = + new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); + verificationOutputRepository = + new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory); indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( @@ -257,87 +254,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return false; } - private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName, - byte[] startRow, byte[] stopRow) { - byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); - int targetOffset = 0; - // The row key for the result table : timestamp | index table name | datable table region name | - // scan start row | scan stop row - byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + - ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length + - startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length]; - Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); - targetOffset += keyPrefix.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); - targetOffset += indexTableName.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length); - targetOffset += regionName.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length); - targetOffset += startRow.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length); - return rowKey; - } - - private void logToIndexToolResultTable() throws IOException { - long scanMaxTs = scan.getTimeRange().getMax(); - byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), - Bytes.toBytes(region.getRegionInfo().getRegionNameAsString()), scan.getStartRow(), scan.getStopRow()); - Put put = new Put(rowKey); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount))); - if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH || - verifyType == IndexTool.IndexVerifyType.ONLY) { - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.indexHasExtraCellsCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.indexHasMissingCellsCount))); - } - if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.indexHasExtraCellsCount))); - put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.indexHasMissingCellsCount))); - } - resultHTable.put(put); - } - @Override public void close() throws IOException { innerScanner.close(); if (verify) { try { - logToIndexToolResultTable(); + verificationResultRepository.logToIndexToolResultTable(verificationResult, + verifyType, region.getRegionInfo().getRegionName()); } finally { this.pool.stop("IndexRebuildRegionScanner is closing"); indexHTable.close(); - outputHTable.close(); - resultHTable.close(); + verificationResultRepository.close(); + verificationOutputRepository.close(); } } } @@ -422,80 +350,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return true; } - @VisibleForTesting public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, - String errorMsg) throws IOException { - logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, - errorMsg, null, null); - - } - - private static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) { - byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); - byte[] rowKey; - int targetOffset = 0; - // The row key for the output table : timestamp | index table name | data row key - rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + - ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length]; - Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); - targetOffset += keyPrefix.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); - targetOffset += indexTableName.length; - Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); - targetOffset += ROW_KEY_SEPARATOR_BYTE.length; - Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length); - return rowKey; + String errorMsg) throws IOException { + logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null); } @VisibleForTesting public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, - String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException { - final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:"); - final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:"); - final int PREFIX_LENGTH = 3; - final int TOTAL_PREFIX_LENGTH = 6; - final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE"); - final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER"); - long scanMaxTs = scan.getTimeRange().getMax(); - byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey); - Put put = new Put(rowKey); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES, - scanMaxTs, region.getRegionInfo().getTable().getName()); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES, - scanMaxTs, indexMaintainer.getIndexTableName()); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs))); - - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES, - scanMaxTs, indexRowKey); - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES, - scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs))); - byte[] errorMessageBytes; - if (expectedValue != null) { - errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length + - TOTAL_PREFIX_LENGTH]; - Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length()); - int length = errorMsg.length(); - Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); - length += PREFIX_LENGTH; - Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length); - length += expectedValue.length; - Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); - length += PREFIX_LENGTH; - Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length); - - } else { - errorMessageBytes = Bytes.toBytes(errorMsg); - } - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes); - if (isBeforeRebuilt) { - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE); - } else { - put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE); - } - outputHTable.put(put); + String errorMsg, byte[] expectedVaue, byte[] actualValue) + throws IOException { + verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, + errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(), + region.getRegionInfo().getTable().getName(), isBeforeRebuilt); } private static long getMaxTimestamp(Mutation m) { @@ -549,7 +415,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg); - verificationPhaseResult.indexHasMissingCellsCount++; + verificationPhaseResult.setIndexHasMissingCellsCount(verificationPhaseResult.getIndexHasMissingCellsCount()+1); return false; } if (!CellUtil.matchingValue(actualCell, expectedCell)) { @@ -573,7 +439,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants); logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg); - verificationPhaseResult.indexHasExtraCellsCount++; + verificationPhaseResult.setIndexHasExtraCellsCount(verificationPhaseResult.getIndexHasExtraCellsCount()+1); return false; } return true; @@ -826,7 +692,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { // verify // TODO: have a metric to update for these cases if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) { - verificationPhaseResult.expiredIndexRowCount++; + verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1); return true; } actual = actualMutationList.get(actualIndex); @@ -878,7 +744,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (matchingCount > 0) { break; } - verificationPhaseResult.invalidIndexRowCount++; + verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1); return false; } if ((expectedIndex != expectedSize) || actualIndex != actualSize) { @@ -898,11 +764,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { String errorMsg = "Not matching index row"; logToIndexToolOutputTable(dataKey, indexRow.getRow(), getTimestamp(expectedMutationList.get(0)), 0L, errorMsg); - verificationPhaseResult.invalidIndexRowCount++; + verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1); return false; } } - verificationPhaseResult.validIndexRowCount++; + verificationPhaseResult.setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount() + 1); return true; } @@ -958,7 +824,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { List<Mutation> mutationList = indexKeyToMutationMap.get(key); if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) { itr.remove(); - verificationPhaseResult.expiredIndexRowCount++; + verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1); } } } @@ -975,7 +841,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { keyRange.getLowerRange(), getMaxTimestamp(dataKeyToMutationMap.get(dataKey)), getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg); - verificationPhaseResult.missingIndexRowCount++; + verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1); } } keys.addAll(invalidKeys); @@ -1091,12 +957,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } private void verifyAndOrRebuildIndex() throws IOException { - IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(); - nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size(); + IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(scan); + nextVerificationResult.setScannedDataRowCount(dataKeyToMutationMap.size()); if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) { // For these options we start with rebuilding index rows rebuildIndexRows(mutations); - nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size(); + nextVerificationResult.setRebuiltIndexRowCount(dataKeyToMutationMap.size()); isBeforeRebuilt = false; } if (verifyType == IndexTool.IndexVerifyType.NONE) { @@ -1107,7 +973,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); // For these options we start with verifying index rows parallelizeIndexVerify(verificationPhaseResult); - nextVerificationResult.before.add(verificationPhaseResult); + nextVerificationResult.getBefore().add(verificationPhaseResult); } if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { // For these options, we have identified the rows to be rebuilt and now need to rebuild them @@ -1123,7 +989,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } } rebuildIndexRows(mutations); - nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size(); + nextVerificationResult.setRebuiltIndexRowCount(nextVerificationResult.getRebuiltIndexRowCount() + dataKeyToMutationMap.size()); isBeforeRebuilt = false; } @@ -1135,7 +1001,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond()); } parallelizeIndexVerify(verificationPhaseResult); - nextVerificationResult.after.add(verificationPhaseResult); + nextVerificationResult.getAfter().add(verificationPhaseResult); } verificationResult.add(nextVerificationResult); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java index 083bfad..0c69d30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java @@ -19,65 +19,122 @@ package org.apache.phoenix.coprocessor; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.mapreduce.index.IndexTool; - -import java.io.IOException; -import java.util.Arrays; - -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES; -import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY; -import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES; + +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.REBUILT_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.SCANNED_DATA_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES; public class IndexToolVerificationResult { + + public void setScannedDataRowCount(long scannedDataRowCount) { + this.scannedDataRowCount = scannedDataRowCount; + } + + public void setRebuiltIndexRowCount(long rebuiltIndexRowCount) { + this.rebuiltIndexRowCount = rebuiltIndexRowCount; + } + + public PhaseResult getBefore() { + return before; + } + + public void setBefore(PhaseResult before) { + this.before = before; + } + + public PhaseResult getAfter() { + return after; + } + + public void setAfter(PhaseResult after) { + this.after = after; + } + + public byte[] getStartRow() { + return startRow; + } + + public byte[] getStopRow() { + return stopRow; + } + + public long getScanMaxTs() { + return scanMaxTs; + } + + public IndexToolVerificationResult(long scanMaxTs) { + this.scanMaxTs = scanMaxTs; + } + + public IndexToolVerificationResult(byte[] startRow, byte[] stopRow, long scanMaxTs){ + this.setStartRow(startRow); + this.setStopRow(stopRow); + this.scanMaxTs = scanMaxTs; + } + + public IndexToolVerificationResult(Scan scan){ + this.setStartRow(scan.getStartRow()); + this.setStopRow(scan.getStopRow()); + this.scanMaxTs = scan.getTimeRange().getMax(); + } + + public byte[] getRegion() { + return region; + } + + public void setStartRow(byte[] startRow) { + this.startRow = startRow; + } + + public void setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + } + public static class PhaseResult { - long validIndexRowCount = 0; - long expiredIndexRowCount = 0; - long missingIndexRowCount = 0; - long invalidIndexRowCount = 0; - long indexHasExtraCellsCount = 0; - long indexHasMissingCellsCount = 0; + private long validIndexRowCount = 0; + private long expiredIndexRowCount = 0; + private long missingIndexRowCount = 0; + private long invalidIndexRowCount = 0; + private long indexHasExtraCellsCount = 0; + private long indexHasMissingCellsCount = 0; public void add(PhaseResult phaseResult) { - validIndexRowCount += phaseResult.validIndexRowCount; - expiredIndexRowCount += phaseResult.expiredIndexRowCount; - missingIndexRowCount += phaseResult.missingIndexRowCount; - invalidIndexRowCount += phaseResult.invalidIndexRowCount; - indexHasExtraCellsCount += phaseResult.indexHasExtraCellsCount; - indexHasMissingCellsCount += phaseResult.indexHasMissingCellsCount; + setValidIndexRowCount(getValidIndexRowCount() + phaseResult.getValidIndexRowCount()); + setExpiredIndexRowCount(getExpiredIndexRowCount() + phaseResult.getExpiredIndexRowCount()); + setMissingIndexRowCount(getMissingIndexRowCount() + phaseResult.getMissingIndexRowCount()); + setInvalidIndexRowCount(getInvalidIndexRowCount() + phaseResult.getInvalidIndexRowCount()); + setIndexHasExtraCellsCount(getIndexHasExtraCellsCount() + phaseResult.getIndexHasExtraCellsCount()); + setIndexHasMissingCellsCount(getIndexHasMissingCellsCount() + phaseResult.getIndexHasMissingCellsCount()); } public PhaseResult(){} public PhaseResult(long validIndexRowCount, long expiredIndexRowCount, long missingIndexRowCount, long invalidIndexRowCount, long indexHasExtraCellsCount, long indexHasMissingCellsCount) { - this.validIndexRowCount = validIndexRowCount; - this.expiredIndexRowCount = expiredIndexRowCount; - this.missingIndexRowCount = missingIndexRowCount; - this.invalidIndexRowCount = invalidIndexRowCount; - this.indexHasExtraCellsCount = indexHasExtraCellsCount; - this.indexHasMissingCellsCount = indexHasMissingCellsCount; + this.setValidIndexRowCount(validIndexRowCount); + this.setExpiredIndexRowCount(expiredIndexRowCount); + this.setMissingIndexRowCount(missingIndexRowCount); + this.setInvalidIndexRowCount(invalidIndexRowCount); + this.setIndexHasExtraCellsCount(indexHasExtraCellsCount); + this.setIndexHasMissingCellsCount(indexHasMissingCellsCount); } public long getTotalCount() { - return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount; + return getValidIndexRowCount() + getExpiredIndexRowCount() + getMissingIndexRowCount() + getInvalidIndexRowCount(); } public long getIndexHasExtraCellsCount() { @@ -87,12 +144,12 @@ public class IndexToolVerificationResult { @Override public String toString() { return "PhaseResult{" + - "validIndexRowCount=" + validIndexRowCount + - ", expiredIndexRowCount=" + expiredIndexRowCount + - ", missingIndexRowCount=" + missingIndexRowCount + - ", invalidIndexRowCount=" + invalidIndexRowCount + - ", extraCellsOnIndexCount=" + indexHasExtraCellsCount + - ", missingCellsOnIndexCount=" + indexHasMissingCellsCount + + "validIndexRowCount=" + getValidIndexRowCount() + + ", expiredIndexRowCount=" + getExpiredIndexRowCount() + + ", missingIndexRowCount=" + getMissingIndexRowCount() + + ", invalidIndexRowCount=" + getInvalidIndexRowCount() + + ", extraCellsOnIndexCount=" + getIndexHasExtraCellsCount() + + ", missingCellsOnIndexCount=" + getIndexHasMissingCellsCount() + '}'; } @@ -105,39 +162,87 @@ public class IndexToolVerificationResult { return false; } PhaseResult pr = (PhaseResult) o; - return this.expiredIndexRowCount == pr.expiredIndexRowCount - && this.validIndexRowCount == pr.validIndexRowCount - && this.invalidIndexRowCount == pr.invalidIndexRowCount - && this.missingIndexRowCount == pr.missingIndexRowCount - && this.indexHasMissingCellsCount == pr.indexHasMissingCellsCount - && this.indexHasExtraCellsCount == pr.indexHasExtraCellsCount; + return this.getExpiredIndexRowCount() == pr.getExpiredIndexRowCount() + && this.getValidIndexRowCount() == pr.getValidIndexRowCount() + && this.getInvalidIndexRowCount() == pr.getInvalidIndexRowCount() + && this.getMissingIndexRowCount() == pr.getMissingIndexRowCount() + && this.getIndexHasMissingCellsCount() == pr.getIndexHasMissingCellsCount() + && this.getIndexHasExtraCellsCount() == pr.getIndexHasExtraCellsCount(); } @Override public int hashCode() { long result = 17; - result = 31 * result + expiredIndexRowCount; - result = 31 * result + validIndexRowCount; - result = 31 * result + missingIndexRowCount; - result = 31 * result + invalidIndexRowCount; - result = 31 * result + indexHasMissingCellsCount; - result = 31 * result + indexHasExtraCellsCount; + result = 31 * result + getExpiredIndexRowCount(); + result = 31 * result + getValidIndexRowCount(); + result = 31 * result + getMissingIndexRowCount(); + result = 31 * result + getInvalidIndexRowCount(); + result = 31 * result + getIndexHasMissingCellsCount(); + result = 31 * result + getIndexHasExtraCellsCount(); return (int)result; } + + public long getValidIndexRowCount() { + return validIndexRowCount; + } + + public void setValidIndexRowCount(long validIndexRowCount) { + this.validIndexRowCount = validIndexRowCount; + } + + public long getExpiredIndexRowCount() { + return expiredIndexRowCount; + } + + public void setExpiredIndexRowCount(long expiredIndexRowCount) { + this.expiredIndexRowCount = expiredIndexRowCount; + } + + public long getMissingIndexRowCount() { + return missingIndexRowCount; + } + + public void setMissingIndexRowCount(long missingIndexRowCount) { + this.missingIndexRowCount = missingIndexRowCount; + } + + public long getInvalidIndexRowCount() { + return invalidIndexRowCount; + } + + public void setInvalidIndexRowCount(long invalidIndexRowCount) { + this.invalidIndexRowCount = invalidIndexRowCount; + } + + public void setIndexHasExtraCellsCount(long indexHasExtraCellsCount) { + this.indexHasExtraCellsCount = indexHasExtraCellsCount; + } + + public long getIndexHasMissingCellsCount() { + return indexHasMissingCellsCount; + } + + public void setIndexHasMissingCellsCount(long indexHasMissingCellsCount) { + this.indexHasMissingCellsCount = indexHasMissingCellsCount; + } } - long scannedDataRowCount = 0; - long rebuiltIndexRowCount = 0; - PhaseResult before = new PhaseResult(); - PhaseResult after = new PhaseResult(); + private long scannedDataRowCount = 0; + private long rebuiltIndexRowCount = 0; + private byte[] startRow; + private byte[] stopRow; + private long scanMaxTs; + private byte[] region; + private PhaseResult before = new PhaseResult(); + private PhaseResult after = new PhaseResult(); @Override public String toString() { return "VerificationResult{" + - "scannedDataRowCount=" + scannedDataRowCount + - ", rebuiltIndexRowCount=" + rebuiltIndexRowCount + - ", before=" + before + - ", after=" + after + + "scannedDataRowCount=" + getScannedDataRowCount() + + ", rebuiltIndexRowCount=" + getRebuiltIndexRowCount() + + ", before=" + getBefore() + + ", after=" + getAfter() + '}'; } @@ -150,19 +255,19 @@ public class IndexToolVerificationResult { } public long getBeforeRebuildValidIndexRowCount() { - return before.validIndexRowCount; + return getBefore().getValidIndexRowCount(); } public long getBeforeRebuildExpiredIndexRowCount() { - return before.expiredIndexRowCount; + return getBefore().getExpiredIndexRowCount(); } public long getBeforeRebuildInvalidIndexRowCount() { - return before.invalidIndexRowCount; + return getBefore().getInvalidIndexRowCount(); } public long getBeforeRebuildMissingIndexRowCount() { - return before.missingIndexRowCount; + return getBefore().getMissingIndexRowCount(); } public long getBeforeIndexHasMissingCellsCount() {return before.indexHasMissingCellsCount; } @@ -170,19 +275,19 @@ public class IndexToolVerificationResult { public long getBeforeIndexHasExtraCellsCount() {return before.indexHasExtraCellsCount; } public long getAfterRebuildValidIndexRowCount() { - return after.validIndexRowCount; + return getAfter().getValidIndexRowCount(); } public long getAfterRebuildExpiredIndexRowCount() { - return after.expiredIndexRowCount; + return getAfter().getExpiredIndexRowCount(); } public long getAfterRebuildInvalidIndexRowCount() { - return after.invalidIndexRowCount; + return getAfter().getInvalidIndexRowCount(); } public long getAfterRebuildMissingIndexRowCount() { - return after.missingIndexRowCount; + return getAfter().getMissingIndexRowCount(); } public long getAfterIndexHasMissingCellsCount() { return after.indexHasMissingCellsCount; } @@ -190,27 +295,27 @@ public class IndexToolVerificationResult { public long getAfterIndexHasExtraCellsCount() { return after.indexHasExtraCellsCount; } private void addScannedDataRowCount(long count) { - this.scannedDataRowCount += count; + this.setScannedDataRowCount(this.getScannedDataRowCount() + count); } private void addRebuiltIndexRowCount(long count) { - this.rebuiltIndexRowCount += count; + this.setRebuiltIndexRowCount(this.getRebuiltIndexRowCount() + count); } private void addBeforeRebuildValidIndexRowCount(long count) { - before.validIndexRowCount += count; + getBefore().setValidIndexRowCount(getBefore().getValidIndexRowCount() + count); } private void addBeforeRebuildExpiredIndexRowCount(long count) { - before.expiredIndexRowCount += count; + getBefore().setExpiredIndexRowCount(getBefore().getExpiredIndexRowCount() + count); } private void addBeforeRebuildMissingIndexRowCount(long count) { - before.missingIndexRowCount += count; + getBefore().setMissingIndexRowCount(getBefore().getMissingIndexRowCount() + count); } private void addBeforeRebuildInvalidIndexRowCount(long count) { - before.invalidIndexRowCount += count; + getBefore().setInvalidIndexRowCount(getBefore().getInvalidIndexRowCount() + count); } public void addBeforeIndexHasMissingCellsCount(long count) { @@ -222,19 +327,19 @@ public class IndexToolVerificationResult { } private void addAfterRebuildValidIndexRowCount(long count) { - after.validIndexRowCount += count; + getAfter().setValidIndexRowCount(getAfter().getValidIndexRowCount() + count); } private void addAfterRebuildExpiredIndexRowCount(long count) { - after.expiredIndexRowCount += count; + getAfter().setExpiredIndexRowCount(getAfter().getExpiredIndexRowCount() + count); } private void addAfterRebuildMissingIndexRowCount(long count) { - after.missingIndexRowCount += count; + getAfter().setMissingIndexRowCount(getAfter().getMissingIndexRowCount() + count); } private void addAfterRebuildInvalidIndexRowCount(long count) { - after.invalidIndexRowCount += count; + getAfter().setInvalidIndexRowCount(getAfter().getInvalidIndexRowCount() + count); } public void addAfterIndexHasMissingCellsCount(long count) { @@ -259,7 +364,7 @@ public class IndexToolVerificationResult { cell.getValueOffset(), cell.getValueLength())); } - private void update(Cell cell) { + public void update(Cell cell) { if (CellUtil .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) { addScannedDataRowCount(getValue(cell)); @@ -292,57 +397,17 @@ public class IndexToolVerificationResult { } } - public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { - // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually. - // Search for the place where the trailing 0xFFs start - int offset = rowKeyPrefix.length; - while (offset > 0) { - if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { - break; - } - offset--; - } - if (offset == 0) { - // We got an 0xFFFF... (only FFs) stopRow value which is - // the last possible prefix before the end of the table. - // So set it to stop at the 'end of the table' - return HConstants.EMPTY_END_ROW; - } - // Copy the right length of the original - byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); - // And increment the last one - newStopRow[newStopRow.length - 1]++; - return newStopRow; - } - - public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts) - throws IOException { - IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(); - byte[] startRowKey = Bytes.toBytes(Long.toString(ts)); - byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey); - Scan scan = new Scan(); - scan.setStartRow(startRowKey); - scan.setStopRow(stopRowKey); - ResultScanner scanner = hTable.getScanner(scan); - for (Result result = scanner.next(); result != null; result = scanner.next()) { - for (Cell cell : result.rawCells()) { - verificationResult.update(cell); - } - } - return verificationResult; - } - public boolean isVerificationFailed() { - if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) { + if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount() > 0) { return true; } return false; } public void add(IndexToolVerificationResult verificationResult) { - scannedDataRowCount += verificationResult.scannedDataRowCount; - rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount; - before.add(verificationResult.before); - after.add(verificationResult.after); + setScannedDataRowCount(getScannedDataRowCount() + verificationResult.getScannedDataRowCount()); + setRebuiltIndexRowCount(getRebuiltIndexRowCount() + verificationResult.getRebuiltIndexRowCount()); + getBefore().add(verificationResult.getBefore()); + getAfter().add(verificationResult.getAfter()); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 23d6057..ed32893 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -50,7 +50,6 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; @@ -74,7 +73,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.phoenix.compile.PostIndexDDLCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; @@ -157,58 +155,6 @@ public class IndexTool extends Configured implements Tool { } } - public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL"; - public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME); - public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; - public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT"; - public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME); - public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; - public final static String DATA_TABLE_NAME = "DTName"; - public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME); - public static String INDEX_TABLE_NAME = "ITName"; - public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME); - public static String DATA_TABLE_ROW_KEY = "DTRowKey"; - public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY); - public static String INDEX_TABLE_ROW_KEY = "ITRowKey"; - public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY); - public static String DATA_TABLE_TS = "DTTS"; - public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS); - public static String INDEX_TABLE_TS = "ITTS"; - public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS); - public static String ERROR_MESSAGE = "Error"; - public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE); - public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount"; - public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT); - public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount"; - public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount"; - public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount"; - public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount"; - public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount"; - public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT); - public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS = "BeforeRebuildInvalidIndexRowCountCozExtraCells"; - public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS); - public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS = "BeforeRebuildInvalidIndexRowCountCozMissingCells"; - public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS); - - public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount"; - public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount"; - public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount"; - public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount"; - public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT); - public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS = "AfterRebuildInvalidIndexRowCountCozExtraCells"; - public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS); - public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS = "AfterRebuildInvalidIndexRowCountCozMissingCells"; - public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS); - public static String VERIFICATION_PHASE = "Phase"; - public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE); - private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class); private String schemaName; @@ -699,24 +645,10 @@ public class IndexTool extends Configured implements Tool { } private void createIndexToolTables(Connection connection) throws Exception { - ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices(); - Admin admin = queryServices.getAdmin(); - if (!admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) { - HTableDescriptor tableDescriptor = new - HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME)); - tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); - HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY); - tableDescriptor.addFamily(columnDescriptor); - admin.createTable(tableDescriptor); - } - if (!admin.tableExists(TableName.valueOf(RESULT_TABLE_NAME))) { - HTableDescriptor tableDescriptor = new - HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME)); - tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); - HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY); - tableDescriptor.addFamily(columnDescriptor); - admin.createTable(tableDescriptor); - } + IndexVerificationResultRepository resultRepo = new IndexVerificationResultRepository(); + resultRepo.createResultTable(connection); + IndexVerificationOutputRepository outputRepo = new IndexVerificationOutputRepository(); + outputRepo.createOutputTable(connection); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java new file mode 100644 index 0000000..45e4fd0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.ByteUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class IndexVerificationOutputRepository { + public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|"); + private Table indexHTable; + private byte[] indexName; + private Table outputHTable; + public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL"; + public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME); + public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; + + public final static String DATA_TABLE_NAME = "DTName"; + public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME); + public static String INDEX_TABLE_NAME = "ITName"; + public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME); + public static String DATA_TABLE_ROW_KEY = "DTRowKey"; + public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY); + public static String INDEX_TABLE_ROW_KEY = "ITRowKey"; + public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY); + public static String DATA_TABLE_TS = "DTTS"; + public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS); + public static String INDEX_TABLE_TS = "ITTS"; + public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS); + public static String ERROR_MESSAGE = "Error"; + public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE); + + public static String VERIFICATION_PHASE = "Phase"; + public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE); + public final static String EXPECTED_VALUE = "ExpectedValue"; + public final static byte[] EXPECTED_VALUE_BYTES = Bytes.toBytes(EXPECTED_VALUE); + public final static String ACTUAL_VALUE = "ActualValue"; + public final static byte[] ACTUAL_VALUE_BYTES = Bytes.toBytes(ACTUAL_VALUE); + public static final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:"); + public static final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:"); + public static final int PREFIX_LENGTH = 3; + public static final int TOTAL_PREFIX_LENGTH = 6; + public static final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE"); + public static final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER"); + + /** + * Only usable for the read path or for testing + */ + public IndexVerificationOutputRepository(){ + + } + + @VisibleForTesting + public IndexVerificationOutputRepository(byte[] indexName, Connection conn) throws SQLException { + ConnectionQueryServices queryServices = + conn.unwrap(PhoenixConnection.class).getQueryServices(); + outputHTable = queryServices.getTable(OUTPUT_TABLE_NAME_BYTES); + indexHTable = queryServices.getTable(indexName); + } + + public IndexVerificationOutputRepository(byte[] indexName, HTableFactory hTableFactory) throws IOException { + this.indexName = indexName; + outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(OUTPUT_TABLE_NAME_BYTES)); + indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName)); + } + + public static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) { + byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); + byte[] rowKey; + int targetOffset = 0; + // The row key for the output table : timestamp | index table name | data row key + rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + + ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length]; + Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); + targetOffset += keyPrefix.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); + targetOffset += indexTableName.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length); + return rowKey; + } + + /** + * Generates partial row key for use in a Scan to get all rows for an index verification + */ + private static byte[] generatePartialOutputTableRowKey(long ts, byte[] indexTableName){ + byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); + byte[] partialRowKey; + int targetOffset = 0; + // The row key for the output table : timestamp | index table name | data row key + partialRowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length]; + Bytes.putBytes(partialRowKey, targetOffset, keyPrefix, 0, keyPrefix.length); + targetOffset += keyPrefix.length; + Bytes.putBytes(partialRowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(partialRowKey, targetOffset, indexTableName, 0, indexTableName.length); + return partialRowKey; + } + + public void createOutputTable(Connection connection) throws IOException, SQLException { + ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + if (!admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) { + HTableDescriptor tableDescriptor = new + HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME)); + tableDescriptor.setValue(HColumnDescriptor.TTL, + String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); + HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY); + tableDescriptor.addFamily(columnDescriptor); + admin.createTable(tableDescriptor); + } + } + + @VisibleForTesting + public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, + long indexRowTs, + String errorMsg, byte[] expectedValue, byte[] actualValue, + long scanMaxTs, byte[] tableName, boolean isBeforeRebuild) + throws IOException { + byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey); + Put put = new Put(rowKey); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, + scanMaxTs, tableName); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES, + scanMaxTs, indexName); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs))); + + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES, + scanMaxTs, indexRowKey); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs))); + byte[] errorMessageBytes; + if (expectedValue != null) { + errorMessageBytes = getErrorMessageBytes(errorMsg, expectedValue, actualValue); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES, expectedValue); + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES, actualValue); + } else { + errorMessageBytes = Bytes.toBytes(errorMsg); + } + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes); + if (isBeforeRebuild) { + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE); + } else { + put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE); + } + outputHTable.put(put); + } + + public static byte[] getErrorMessageBytes(String errorMsg, byte[] expectedValue, byte[] actualValue) { + byte[] errorMessageBytes; + errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length + + TOTAL_PREFIX_LENGTH]; + Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length()); + int length = errorMsg.length(); + Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); + length += PREFIX_LENGTH; + Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length); + length += expectedValue.length; + Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH); + length += PREFIX_LENGTH; + Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length); + return errorMessageBytes; + } + + public List<IndexVerificationOutputRow> getOutputRows(long ts, byte[] indexName) + throws IOException { + Iterator<IndexVerificationOutputRow> iter = getOutputRowIterator(ts, indexName); + List<IndexVerificationOutputRow> outputRowList = new ArrayList<IndexVerificationOutputRow>(); + while (iter.hasNext()){ + outputRowList.add(iter.next()); + } + return outputRowList; + } + + public Iterator<IndexVerificationOutputRow> getOutputRowIterator(long ts, byte[] indexName) + throws IOException { + Scan scan = new Scan(); + byte[] partialKey = generatePartialOutputTableRowKey(ts, indexName); + scan.withStartRow(partialKey); + scan.withStopRow(ByteUtil.calculateTheClosestNextRowKeyForPrefix(partialKey)); + ResultScanner scanner = outputHTable.getScanner(scan); + return new IndexVerificationOutputRowIterator(scanner.iterator()); + } + + public static IndexVerificationOutputRow getOutputRowFromResult(Result result) { + IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder = + new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder(); + byte[] rowKey = result.getRow(); + //rowkey is scanTs + SEPARATOR_BYTE + indexTableName + SEPARATOR_BYTE + dataTableRowKey + byte[][] rowKeySplit = ByteUtil.splitArrayBySeparator(rowKey, ROW_KEY_SEPARATOR_BYTE[0]); + builder.setScanMaxTimestamp(Long.parseLong(Bytes.toString(rowKeySplit[0]))); + builder.setIndexTableName(Bytes.toString(rowKeySplit[1])); + builder.setDataTableRowKey(rowKeySplit[2]); + + builder.setDataTableName(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + DATA_TABLE_NAME_BYTES))); + builder.setIndexTableRowKey(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + INDEX_TABLE_ROW_KEY_BYTES)); + builder.setDataTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + DATA_TABLE_TS_BYTES)))); + builder.setIndexTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + INDEX_TABLE_TS_BYTES)))); + builder.setErrorMessage(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, + ERROR_MESSAGE_BYTES))); + //actual and expected value might not be present, but will just set to null if not + builder.setExpectedValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES)); + builder.setActualValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES)); + builder.setPhaseValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES)); + return builder.build(); + } + + public void close() throws IOException { + if (outputHTable != null) { + outputHTable.close(); + } + if (indexHTable != null) { + indexHTable.close(); + } + } + + public class IndexVerificationOutputRowIterator implements Iterator<IndexVerificationOutputRow> { + Iterator<Result> delegate; + public IndexVerificationOutputRowIterator(Iterator<Result> delegate){ + this.delegate = delegate; + } + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public IndexVerificationOutputRow next() { + Result result = delegate.next(); + if (result == null) { + return null; + } else { + return getOutputRowFromResult(result); + } + } + + @Override + public void remove() { + delegate.remove(); + } + + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java new file mode 100644 index 0000000..1ecac08 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Arrays; +import java.util.Objects; + +public class IndexVerificationOutputRow { + private String dataTableName; + private String indexTableName; + private Long scanMaxTimestamp; + private byte[] dataTableRowKey; + private byte[] indexTableRowKey; + private Long dataTableRowTimestamp; + private Long indexTableRowTimestamp; + private String errorMessage; + private byte[] expectedValue; + private byte[] actualValue; + private byte[] phaseValue; + + private IndexVerificationOutputRow(String dataTableName, String indexTableName, + byte[] dataTableRowKey, Long scanMaxTimestamp, + byte[] indexTableRowKey, + long dataTableRowTimestamp, long indexTableRowTimestamp, + String errorMessage, byte[] expectedValue, byte[] actualValue, + byte[] phaseValue) { + this.dataTableName = dataTableName; + this.indexTableName = indexTableName; + this.scanMaxTimestamp = scanMaxTimestamp; + this.dataTableRowKey = dataTableRowKey; + this.indexTableRowKey = indexTableRowKey; + this.dataTableRowTimestamp = dataTableRowTimestamp; + this.indexTableRowTimestamp = indexTableRowTimestamp; + this.errorMessage = errorMessage; + this.expectedValue = expectedValue; + this.actualValue = actualValue; + this.phaseValue = phaseValue; + } + + public String getDataTableName() { + return dataTableName; + } + + public String getIndexTableName() { + return indexTableName; + } + + public Long getScanMaxTimestamp() { + return scanMaxTimestamp; + } + + public byte[] getIndexTableRowKey() { + return indexTableRowKey; + } + + public long getIndexTableRowTimestamp() { + return indexTableRowTimestamp; + } + + public String getErrorMessage() { + return errorMessage; + } + + public byte[] getExpectedValue() { + return expectedValue; + } + + public byte[] getActualValue() { + return actualValue; + } + + public byte[] getPhaseValue() { + return phaseValue; + } + + public byte[] getDataTableRowKey() { + return dataTableRowKey; + } + + public Long getDataTableRowTimestamp() { + return dataTableRowTimestamp; + } + + @Override + public boolean equals(Object o) { + if (o == null ) { + return false; + } + if (!(o instanceof IndexVerificationOutputRow)) { + return false; + } + IndexVerificationOutputRow otherRow = (IndexVerificationOutputRow) o; + + return Objects.equals(dataTableName, otherRow.getDataTableName()) && + Objects.equals(indexTableName, otherRow.getIndexTableName()) && + Objects.equals(scanMaxTimestamp, otherRow.getScanMaxTimestamp()) && + Arrays.equals(dataTableRowKey, otherRow.getDataTableRowKey()) && + Arrays.equals(indexTableRowKey, otherRow.getIndexTableRowKey()) && + Objects.equals(dataTableRowTimestamp, otherRow.getDataTableRowTimestamp()) && + Objects.equals(indexTableRowTimestamp, otherRow.getIndexTableRowTimestamp()) && + Objects.equals(errorMessage, otherRow.getErrorMessage()) && + Arrays.equals(expectedValue, otherRow.getExpectedValue()) && + Arrays.equals(actualValue, otherRow.getActualValue()) && + Arrays.equals(phaseValue, otherRow.getPhaseValue()); + } + + @Override + public int hashCode(){ + return Objects.hashCode(scanMaxTimestamp) ^ Objects.hashCode(indexTableName) ^ + Arrays.hashCode(dataTableRowKey); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("DataTableName: ").append(dataTableName).append(","); + sb.append("IndexTableName: ").append(indexTableName).append(","); + sb.append("ScanMaxTimestamp: ").append(scanMaxTimestamp).append(","); + sb.append("DataTableRowKey: ").append(Bytes.toString(dataTableRowKey)).append(","); + sb.append("IndexTableRowKey: ").append(Bytes.toString(indexTableRowKey)).append(","); + sb.append("DataTableRowTimestamp: ").append(dataTableRowTimestamp).append(","); + sb.append("IndexTableRowTimestamp: ").append(indexTableRowTimestamp).append(","); + sb.append("ErrorMessage: ").append(errorMessage).append(","); + sb.append("ExpectedValue: ").append(Bytes.toString(expectedValue)).append(","); + sb.append("ActualValue: ").append(Bytes.toString(actualValue)).append(","); + sb.append("PhaseValue: ").append(Bytes.toString(phaseValue)); + return sb.toString(); + } + + public static class IndexVerificationOutputRowBuilder { + private String dataTableName; + private String indexTableName; + private Long scanMaxTimestamp; + private byte[] dataTableRowKey; + private byte[] indexTableRowKey; + private long dataTableRowTimestamp; + private long indexTableRowTimestamp; + private String errorMessage; + private byte[] expectedValue; + private byte[] actualValue; + private byte[] phaseValue; + + public IndexVerificationOutputRowBuilder setDataTableName(String dataTableName) { + this.dataTableName = dataTableName; + return this; + } + + public IndexVerificationOutputRowBuilder setIndexTableName(String indexTableName) { + this.indexTableName = indexTableName; + return this; + } + + public IndexVerificationOutputRowBuilder setScanMaxTimestamp(Long scanMaxTimestamp) { + this.scanMaxTimestamp = scanMaxTimestamp; + return this; + } + + public IndexVerificationOutputRowBuilder setIndexTableRowKey(byte[] indexTableRowKey) { + this.indexTableRowKey = indexTableRowKey; + return this; + } + + public IndexVerificationOutputRowBuilder setDataTableRowKey(byte[] dataTableRowKey){ + this.dataTableRowKey = dataTableRowKey; + return this; + } + + public IndexVerificationOutputRowBuilder setDataTableRowTimestamp(long dataTableRowTimestamp) { + this.dataTableRowTimestamp = dataTableRowTimestamp; + return this; + } + + public IndexVerificationOutputRowBuilder setIndexTableRowTimestamp(long indexTableRowTimestamp) { + this.indexTableRowTimestamp = indexTableRowTimestamp; + return this; + } + + public IndexVerificationOutputRowBuilder setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } + + public IndexVerificationOutputRowBuilder setExpectedValue(byte[] expectedValue) { + this.expectedValue = expectedValue; + return this; + } + + public IndexVerificationOutputRowBuilder setActualValue(byte[] actualValue) { + this.actualValue = actualValue; + return this; + } + + public IndexVerificationOutputRowBuilder setPhaseValue(byte[] phaseValue) { + this.phaseValue = phaseValue; + return this; + } + + public IndexVerificationOutputRow build() { + return new IndexVerificationOutputRow(dataTableName, indexTableName, dataTableRowKey, + scanMaxTimestamp, indexTableRowKey, dataTableRowTimestamp, indexTableRowTimestamp, + errorMessage, expectedValue, actualValue, phaseValue); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java new file mode 100644 index 0000000..987328c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.IndexToolVerificationResult; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.ByteUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; + +public class IndexVerificationResultRepository { + private Table resultHTable; + private Table indexHTable; + public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|"); + public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT"; + public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME); + public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; + public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount"; + public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT); + public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount"; + public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT); + public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount"; + public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT); + public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount"; + public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT); + public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount"; + public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT); + public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount"; + public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT); + public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount"; + public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT); + public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount"; + public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT); + public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount"; + public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT); + public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount"; + public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT); + public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS = "AfterRebuildInvalidIndexRowCountCozExtraCells"; + public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS); + public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS = "AfterRebuildInvalidIndexRowCountCozMissingCells"; + public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS); + public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS = "BeforeRebuildInvalidIndexRowCountCozExtraCells"; + public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS); + public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS = "BeforeRebuildInvalidIndexRowCountCozMissingCells"; + public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS); + + /*** + * Only usable for read methods + */ + public IndexVerificationResultRepository(){ + + } + + public IndexVerificationResultRepository(Connection conn, byte[] indexNameBytes) throws SQLException { + resultHTable = getTable(conn, RESULT_TABLE_NAME_BYTES); + indexHTable = getTable(conn, indexNameBytes); + } + + public IndexVerificationResultRepository(byte[] indexName, + HTableFactory hTableFactory) throws IOException { + resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(RESULT_TABLE_NAME_BYTES)); + indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName)); + } + + public void createResultTable(Connection connection) throws IOException, SQLException { + ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + + if (!admin.tableExists(TableName.valueOf(RESULT_TABLE_NAME))) { + HTableDescriptor tableDescriptor = new + HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME)); + tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL)); + HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY); + tableDescriptor.addFamily(columnDescriptor); + admin.createTable(tableDescriptor); + } + } + public static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName, + byte[] startRow, byte[] stopRow) { + byte[] keyPrefix = Bytes.toBytes(Long.toString(ts)); + int targetOffset = 0; + // The row key for the result table : timestamp | index table name | datable table region name | + // scan start row | scan stop row + byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length + + ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length + + startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length]; + Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length); + targetOffset += keyPrefix.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length); + targetOffset += indexTableName.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length); + targetOffset += regionName.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length); + targetOffset += startRow.length; + Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length); + targetOffset += ROW_KEY_SEPARATOR_BYTE.length; + Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length); + return rowKey; + } + + public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult, + IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException { + long scanMaxTs = verificationResult.getScanMaxTs(); + byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), + region, verificationResult.getStartRow(), + verificationResult.getStopRow()); + Put put = new Put(rowKey); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount()))); + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH || + verifyType == IndexTool.IndexVerifyType.ONLY) { + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount()))); + } + if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, + scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount()))); + } + resultHTable.put(put); + } + + public IndexToolVerificationResult getVerificationResult(Connection conn, long ts) throws IOException, SQLException { + Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES); + return getVerificationResult(hTable, ts); + } + + public Table getTable(Connection conn, byte[] tableName) throws SQLException { + return conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(tableName); + } + + public IndexToolVerificationResult getVerificationResult(Table htable, long ts) + throws IOException { + byte[] startRowKey = Bytes.toBytes(Long.toString(ts)); + byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(startRowKey); + IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(ts); + Scan scan = new Scan(); + scan.withStartRow(startRowKey); + scan.withStopRow(stopRowKey); + ResultScanner scanner = htable.getScanner(scan); + for (Result result = scanner.next(); result != null; result = scanner.next()) { + boolean isFirst = true; + for (Cell cell : result.rawCells()) { + if (isFirst){ + byte[][] rowKeyParts = ByteUtil.splitArrayBySeparator(result.getRow(), + ROW_KEY_SEPARATOR_BYTE[0]); + verificationResult.setStartRow(rowKeyParts[3]); + verificationResult.setStopRow(rowKeyParts[4]); + isFirst = false; + } + verificationResult.update(cell); + } + } + return verificationResult; + } + + public void close() throws IOException { + if (resultHTable != null) { + resultHTable.close(); + } + if (indexHTable != null) { + indexHTable.close(); + } + } +} + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java index 3c1c7e4..7f32de7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java @@ -25,7 +25,6 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; @@ -52,6 +51,7 @@ import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getInde public class PhoenixIndexImportDirectReducer extends Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> { private AtomicBoolean calledOnce = new AtomicBoolean(false); + private IndexVerificationResultRepository resultRepository; private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class); @@ -61,10 +61,8 @@ public class PhoenixIndexImportDirectReducer extends Configuration configuration = context.getConfiguration(); try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) { long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); - Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(IndexTool.RESULT_TABLE_NAME_BYTES); IndexToolVerificationResult verificationResult = - IndexToolVerificationResult.getVerificationResult(hTable, ts); + resultRepository.getVerificationResult(connection, ts); context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT). setValue(verificationResult.getScannedDataRowCount()); context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT). @@ -107,6 +105,11 @@ public class PhoenixIndexImportDirectReducer extends } @Override + protected void setup(Context context) throws IOException { + resultRepository = new IndexVerificationResultRepository(); + } + + @Override protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1, Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException @@ -132,6 +135,7 @@ public class PhoenixIndexImportDirectReducer extends protected void cleanup(Context context) throws IOException, InterruptedException{ try { updateTasksTable(context); + resultRepository.close(); } catch (SQLException e) { LOGGER.error(" Failed to update the tasks table"); throw new RuntimeException(e.getMessage()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java index 5a2b624..cc582f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java @@ -23,12 +23,14 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -606,4 +608,48 @@ public class ByteUtil { } return true; } + + public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { + // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually. + // Search for the place where the trailing 0xFFs start + int offset = rowKeyPrefix.length; + while (offset > 0) { + if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { + break; + } + offset--; + } + if (offset == 0) { + // We got an 0xFFFF... (only FFs) stopRow value which is + // the last possible prefix before the end of the table. + // So set it to stop at the 'end of the table' + return HConstants.EMPTY_END_ROW; + } + // Copy the right length of the original + byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); + // And increment the last one + newStopRow[newStopRow.length - 1]++; + return newStopRow; + } + + public static byte[][] splitArrayBySeparator(byte[] src, byte separator){ + List<Integer> separatorLocations = new ArrayList<Integer>(); + for (int k = 0; k < src.length; k++){ + if (src[k] == separator){ + separatorLocations.add(k); + } + } + byte[][] dst = new byte[separatorLocations.size() +1][]; + int previousSepartor = -1; + for (int j = 0; j < separatorLocations.size(); j++){ + int separatorLocation = separatorLocations.get(j); + dst[j] = Bytes.copy(src, previousSepartor +1, separatorLocation- previousSepartor -1); + previousSepartor = separatorLocation; + } + if (previousSepartor < src.length){ + dst[separatorLocations.size()] = Bytes.copy(src, + previousSepartor +1, src.length - previousSepartor -1); + } + return dst; + } }