This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new b8ba647 PHOENIX-5804: Implement strong verification with -v ONLY option for old design of secondry indexes (#758) b8ba647 is described below commit b8ba647664c50cc6f677f463f928d3f9bc214766 Author: Swaroopa Kadam <swaroopa.kada...@gmail.com> AuthorDate: Wed Apr 22 22:30:00 2020 -0700 PHOENIX-5804: Implement strong verification with -v ONLY option for old design of secondry indexes (#758) Co-authored-by: s.kadam <s.ka...@apache.org> --- .../end2end/IndexToolForNonTxGlobalIndexIT.java | 483 +++++++++++++++++++++ .../org/apache/phoenix/end2end/IndexToolIT.java | 398 +---------------- .../end2end/IndexVerificationOldDesignIT.java | 161 +++++++ .../coprocessor/GlobalIndexRegionScanner.java | 203 +++++++++ .../coprocessor/IndexRebuildRegionScanner.java | 182 +------- .../phoenix/coprocessor/IndexerRegionScanner.java | 354 +++++++++++++++ .../UngroupedAggregateRegionObserver.java | 23 +- .../phoenix/hbase/index/IndexRegionObserver.java | 10 +- .../index/PhoenixIndexToolJobCounters.java | 2 +- .../phoenix/index/VerifySingleIndexRowTest.java | 3 +- 10 files changed, 1248 insertions(+), 571 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java new file mode 100644 index 0000000..e151040 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; +import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.util.IndexScrutiny; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT { + + private final String tableDDLOptions; + private boolean directApi = true; + private boolean useSnapshot = false; + private boolean mutable; + + public IndexToolForNonTxGlobalIndexIT(boolean mutable) { + StringBuilder optionBuilder = new StringBuilder(); + this.mutable = mutable; + if (!mutable) { + optionBuilder.append(" IMMUTABLE_ROWS=true "); + } + optionBuilder.append(" SPLIT ON(1,2)"); + this.tableDDLOptions = optionBuilder.toString(); + } + + @Parameterized.Parameters(name = "mutable={0}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {true}, + {false} }); + } + + @BeforeClass + public static synchronized void setup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); + serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5)); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8)); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); + clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); + clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString()); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Test + public void testWithSetNull() throws Exception { + // This tests the cases where a column having a null value is overwritten with a not null value and vice versa; + // and after that the index table is still rebuilt correctly + if(!this.mutable) { + return; + } + final int NROWS = 2 * 3 * 5 * 7; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + + tableDDLOptions); + String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(upsertStmt); + IndexToolIT.setEveryNthRowWithNull(NROWS, 2, stmt); + conn.commit(); + IndexToolIT.setEveryNthRowWithNull(NROWS, 3, stmt); + conn.commit(); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ", indexTableName, dataTableFullName)); + // Run the index MR job and verify that the index table is built correctly + IndexTool + indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); + long actualRowCount = IndexScrutiny + .scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + IndexToolIT.setEveryNthRowWithNull(NROWS, 5, stmt); + conn.commit(); + actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + IndexToolIT.setEveryNthRowWithNull(NROWS, 7, stmt); + conn.commit(); + actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, + 0, IndexTool.IndexVerifyType.ONLY, new String[0]); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + IndexToolIT.dropIndexToolTables(conn); + } + } + + @Test + public void testIndexToolVerifyWithExpiredIndexRows() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0"); + // Insert a row + conn.createStatement() + .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')"); + conn.commit(); + conn.createStatement() + .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", + indexTableName, dataTableFullName)); + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, + IndexTool.IndexVerifyType.ONLY); + Cell cell = + IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, + indexTableFullName); + try { + String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW; + String actualErrorMsg = Bytes + .toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertTrue(expectedErrorMsg.equals(actualErrorMsg)); + } catch(Exception ex) { + Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); + } + + // Run the index tool to populate the index while verifying rows + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, + IndexTool.IndexVerifyType.AFTER); + + // Set ttl of index table ridiculously low so that all data is expired + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + TableName indexTable = TableName.valueOf(indexTableFullName); + HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0]; + desc.setTimeToLive(1); + admin.modifyColumn(indexTable, desc); + Thread.sleep(1000); + Pair<Integer, Integer> status = admin.getAlterStatus(indexTable); + int retry = 0; + while (retry < 20 && status.getFirst() != 0) { + Thread.sleep(2000); + status = admin.getAlterStatus(indexTable); + } + assertTrue(status.getFirst() == 0); + + TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); + admin.disableTable(indexToolOutputTable); + admin.deleteTable(indexToolOutputTable); + // Run the index tool using the only-verify option, verify it gives no mismatch + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, + IndexTool.IndexVerifyType.ONLY); + Scan scan = new Scan(); + Table hIndexToolTable = + conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(indexToolOutputTable.getName()); + Result r = hIndexToolTable.getScanner(scan).next(); + assertTrue(r == null); + IndexToolIT.dropIndexToolTables(conn); + } + } + + @Test + public void testSecondaryGlobalIndexFailure() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String stmString1 = + "CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + + tableDDLOptions; + conn.createStatement().execute(stmString1); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + + // Insert two rows + IndexToolIT.upsertRow(stmt1, 1); + IndexToolIT.upsertRow(stmt1, 2); + conn.commit(); + + String stmtString2 = + String.format( + "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ", indexTableName, dataTableFullName); + conn.createStatement().execute(stmtString2); + + // Run the index MR job. + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName); + + String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName); + + // Verify that the index table is in the ACTIVE state + assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName)); + + ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + TableName tableName = TableName.valueOf(qIndexTableName); + admin.disableTable(tableName); + + // Run the index MR job and it should fail (return -1) + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, -1, new String[0]); + + // Verify that the index table should be still in the ACTIVE state + assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName)); + } + } + + @Test + public void testBuildSecondaryIndexAndScrutinize() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String stmString1 = + "CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + + tableDDLOptions; + conn.createStatement().execute(stmString1); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + + // Insert NROWS rows + final int NROWS = 1000; + for (int i = 0; i < NROWS; i++) { + IndexToolIT.upsertRow(stmt1, i); + } + conn.commit(); + String stmtString2 = + String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName); + conn.createStatement().execute(stmtString2); + + // Run the index MR job and verify that the index table is built correctly + IndexTool indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue()); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + + // Add more rows and make sure that these rows will be visible to IndexTool + for (int i = NROWS; i < 2 * NROWS; i++) { + IndexToolIT.upsertRow(stmt1, i); + } + conn.commit(); + indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]); + assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); + assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); + assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(2 * NROWS, actualRowCount); + IndexToolIT.dropIndexToolTables(conn); + } + } + + @Test + public void testIndexToolVerifyBeforeAndBothOptions() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String viewName = generateUniqueName(); + String viewFullName = SchemaUtil.getTableName(schemaName, viewName); + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + + tableDDLOptions); + conn.commit(); + conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName); + conn.commit(); + // Insert a row + conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)"); + conn.commit(); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName)); + TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, IndexToolIT.MutationCountingRegionObserver.class); + // Run the index MR job and verify that the index table rebuild succeeds + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.AFTER); + assertEquals(1, IndexToolIT.MutationCountingRegionObserver.getMutationCount()); + IndexToolIT.MutationCountingRegionObserver.setMutationCount(0); + // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not + // write any index rows + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.BEFORE); + assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount()); + // The "-v BOTH" option should not write any index rows either + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.BOTH); + assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount()); + IndexToolIT.dropIndexToolTables(conn); + } + } + + @Test + public void testIndexToolVerifyAfterOption() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String viewName = generateUniqueName(); + String viewFullName = SchemaUtil.getTableName(schemaName, viewName); + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + + tableDDLOptions); + conn.commit(); + conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName); + conn.commit(); + // Insert a row + conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)"); + conn.commit(); + // Configure IndexRegionObserver to fail the first write phase. This should not + // lead to any change on index and thus the index verify during index rebuild should fail + IndexRegionObserver.setIgnoreIndexRebuildForTesting(true); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName)); + // Run the index MR job and verify that the index table rebuild fails + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, + null, -1, IndexTool.IndexVerifyType.AFTER); + // The index tool output table should report that there is a missing index row + Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName); + try { + String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW; + String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertTrue(expectedErrorMsg.equals(actualErrorMsg)); + } catch(Exception ex){ + Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); + } + IndexRegionObserver.setIgnoreIndexRebuildForTesting(false); + IndexToolIT.dropIndexToolTables(conn); + } + } + + @Test + public void testIndexToolOnlyVerifyOption() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0"); + // Insert a row + conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')"); + conn.commit(); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName)); + // Run the index MR job to only verify that each data table row has a corresponding index row + // IndexTool will go through each data table row and record the mismatches in the output table + // called PHOENIX_INDEX_TOOL + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.ONLY); + Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName); + try { + String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW; + String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertTrue(expectedErrorMsg.equals(actualErrorMsg)); + } catch(Exception ex) { + Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); + } + // Delete the output table for the next test + IndexToolIT.dropIndexToolTables(conn); + // Run the index tool to populate the index while verifying rows + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.AFTER); + IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.ONLY); + IndexToolIT.dropIndexToolTables(conn); + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 02b253f..e5db314 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -189,7 +189,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { return TestUtil.filterTxParamData(list,0); } - private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception { + protected static void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception { for (int i = 1; i <= nrows; i++) { stmt.setInt(1, i); stmt.setInt(2, i + 1); @@ -203,66 +203,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } @Test - public void testWithSetNull() throws Exception { - // This test is for building non-transactional mutable global indexes with direct api - if (localIndex || transactional || !directApi || useSnapshot || useTenantId || !mutable) { - return; - } - // This tests the cases where a column having a null value is overwritten with a not null value and vice versa; - // and after that the index table is still rebuilt correctly - final int NROWS = 2 * 3 * 5 * 7; - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); - String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.createStatement().execute("CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " - + tableDDLOptions); - String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"; - PreparedStatement stmt = conn.prepareStatement(upsertStmt); - setEveryNthRowWithNull(NROWS, 2, stmt); - conn.commit(); - setEveryNthRowWithNull(NROWS, 3, stmt); - conn.commit(); - conn.createStatement().execute(String.format( - "CREATE %s INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ", - (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName)); - // Run the index MR job and verify that the index table is built correctly - IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); - assertEquals(NROWS, actualRowCount); - actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); - assertEquals(NROWS, actualRowCount); - setEveryNthRowWithNull(NROWS, 5, stmt); - conn.commit(); - actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); - assertEquals(NROWS, actualRowCount); - setEveryNthRowWithNull(NROWS, 7, stmt); - conn.commit(); - actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); - assertEquals(NROWS, actualRowCount); - actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); - assertEquals(NROWS, actualRowCount); - indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, - 0, IndexTool.IndexVerifyType.ONLY, new String[0]); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - dropIndexToolTables(conn); - } - } - - @Test public void testSecondaryIndex() throws Exception { String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); @@ -370,7 +310,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } } - private void dropIndexToolTables(Connection conn) throws Exception { + protected static void dropIndexToolTables(Connection conn) throws Exception { Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); @@ -381,80 +321,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { admin.deleteTable(indexToolResultTable); } - @Test - public void testBuildSecondaryIndexAndScrutinize() throws Exception { - // This test is for building non-transactional global indexes with direct api - if (localIndex || transactional || !directApi || useSnapshot || useTenantId) { - return; - } - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); - String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - String stmString1 = - "CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " - + tableDDLOptions; - conn.createStatement().execute(stmString1); - String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); - PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); - - // Insert NROWS rows - final int NROWS = 1000; - for (int i = 0; i < NROWS; i++) { - upsertRow(stmt1, i); - } - conn.commit(); - String stmtString2 = - String.format( - "CREATE %s INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", - (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName); - conn.createStatement().execute(stmtString2); - - // Run the index MR job and verify that the index table is built correctly - IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue()); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); - assertEquals(NROWS, actualRowCount); - - // Add more rows and make sure that these rows will be visible to IndexTool - for (int i = NROWS; i < 2 * NROWS; i++) { - upsertRow(stmt1, i); - } - conn.commit(); - indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]); - assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); - assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); - assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); - assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); - assertEquals(2 * NROWS, actualRowCount); - dropIndexToolTables(conn); - } - } - public static class MutationCountingRegionObserver extends SimpleRegionObserver { public static AtomicInteger mutationCount = new AtomicInteger(0); @@ -473,7 +339,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } } - private void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) { + private static void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) { // The row key for the output table : timestamp | index table name | data row key // The row key for the result table : timestamp | index table name | datable table region name | // scan start row | scan stop row @@ -489,7 +355,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]); } - private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName) + public static Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName) throws Exception { byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName); @@ -536,209 +402,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } @Test - public void testIndexToolVerifyBeforeAndBothOptions() throws Exception { - // This test is for building non-transactional global indexes with direct api - if (localIndex || transactional || !directApi || useSnapshot || useTenantId) { - return; - } - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); - String viewName = generateUniqueName(); - String viewFullName = SchemaUtil.getTableName(schemaName, viewName); - conn.createStatement().execute("CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " - + tableDDLOptions); - conn.commit(); - conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName); - conn.commit(); - // Insert a row - conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)"); - conn.commit(); - conn.createStatement().execute(String.format( - "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName)); - TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, MutationCountingRegionObserver.class); - // Run the index MR job and verify that the index table rebuild succeeds - runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, - null, 0, IndexTool.IndexVerifyType.AFTER); - assertEquals(1, MutationCountingRegionObserver.getMutationCount()); - MutationCountingRegionObserver.setMutationCount(0); - // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not - // write any index rows - runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, - null, 0, IndexTool.IndexVerifyType.BEFORE); - assertEquals(0, MutationCountingRegionObserver.getMutationCount()); - // The "-v BOTH" option should not write any index rows either - runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, - null, 0, IndexTool.IndexVerifyType.BOTH); - assertEquals(0, MutationCountingRegionObserver.getMutationCount()); - dropIndexToolTables(conn); - } - } - - @Test - public void testIndexToolVerifyAfterOption() throws Exception { - // This test is for building non-transactional global indexes with direct api - if (localIndex || transactional || !directApi || useSnapshot || useTenantId) { - return; - } - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); - String viewName = generateUniqueName(); - String viewFullName = SchemaUtil.getTableName(schemaName, viewName); - conn.createStatement().execute("CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " - + tableDDLOptions); - conn.commit(); - conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName); - conn.commit(); - // Insert a row - conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)"); - conn.commit(); - // Configure IndexRegionObserver to fail the first write phase. This should not - // lead to any change on index and thus the index verify during index rebuild should fail - IndexRegionObserver.setIgnoreIndexRebuildForTesting(true); - conn.createStatement().execute(String.format( - "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName)); - // Run the index MR job and verify that the index table rebuild fails - runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, - null, -1, IndexTool.IndexVerifyType.AFTER); - // The index tool output table should report that there is a missing index row - Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName); - try { - String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; - String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - assertTrue(expectedErrorMsg.equals(actualErrorMsg)); - } catch(Exception ex){ - Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); - } - IndexRegionObserver.setIgnoreIndexRebuildForTesting(false); - dropIndexToolTables(conn); - } - } - - @Test - public void testIndexToolOnlyVerifyOption() throws Exception { - // This test is for building non-transactional global indexes with direct api - if (localIndex || transactional || !directApi || useSnapshot || useTenantId) { - return; - } - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); - String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.createStatement().execute("CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0"); - // Insert a row - conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')"); - conn.commit(); - conn.createStatement().execute(String.format( - "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName)); - // Run the index MR job to only verify that each data table row has a corresponding index row - // IndexTool will go through each data table row and record the mismatches in the output table - // called PHOENIX_INDEX_TOOL - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, - null, 0, IndexTool.IndexVerifyType.ONLY); - Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName); - try { - String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; - String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - assertTrue(expectedErrorMsg.equals(actualErrorMsg)); - } catch(Exception ex) { - Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); - } - // Delete the output table for the next test - dropIndexToolTables(conn); - // Run the index tool to populate the index while verifying rows - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, - null, 0, IndexTool.IndexVerifyType.AFTER); - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, - null, 0, IndexTool.IndexVerifyType.ONLY); - dropIndexToolTables(conn); - } - } - - @Test - public void testIndexToolVerifyWithExpiredIndexRows() throws Exception { - // This test is for building non-transactional global indexes with direct api - if (localIndex || transactional || !directApi || useSnapshot || useTenantId) { - return; - } - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); - String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.createStatement().execute("CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0"); - // Insert a row - conn.createStatement() - .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')"); - conn.commit(); - conn.createStatement() - .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", - indexTableName, dataTableFullName)); - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, - IndexTool.IndexVerifyType.ONLY); - Cell cell = - getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, - indexTableFullName); - try { - String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; - String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - assertTrue(expectedErrorMsg.equals(actualErrorMsg)); - } catch(Exception ex) { - Assert.fail("Fail to parsing the error message from IndexToolOutputTable"); - } - - // Run the index tool to populate the index while verifying rows - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, - IndexTool.IndexVerifyType.AFTER); - - // Set ttl of index table ridiculously low so that all data is expired - Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - TableName indexTable = TableName.valueOf(indexTableFullName); - HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0]; - desc.setTimeToLive(1); - admin.modifyColumn(indexTable, desc); - Thread.sleep(1000); - Pair<Integer, Integer> status = admin.getAlterStatus(indexTable); - int retry = 0; - while (retry < 20 && status.getFirst() != 0) { - Thread.sleep(2000); - status = admin.getAlterStatus(indexTable); - } - assertTrue(status.getFirst() == 0); - - TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES); - admin.disableTable(indexToolOutputTable); - admin.deleteTable(indexToolOutputTable); - // Run the index tool using the only-verify option, verify it gives no mismatch - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, - IndexTool.IndexVerifyType.ONLY); - Scan scan = new Scan(); - Table hIndexToolTable = - conn.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(indexToolOutputTable.getName()); - Result r = hIndexToolTable.getScanner(scan).next(); - assertTrue(r == null); - dropIndexToolTables(conn); - } - } - - @Test public void testIndexToolWithTenantId() throws Exception { if (!useTenantId) { return;} String tenantId = generateUniqueName(); @@ -823,59 +486,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } @Test - public void testSecondaryGlobalIndexFailure() throws Exception { - // This test is for building non-transactional global indexes with direct api - if (localIndex || transactional || !directApi || useSnapshot || useTenantId) { - return; - } - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName(); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - String stmString1 = - "CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " - + tableDDLOptions; - conn.createStatement().execute(stmString1); - String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); - PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); - - // Insert two rows - upsertRow(stmt1, 1); - upsertRow(stmt1, 2); - conn.commit(); - - String stmtString2 = - String.format( - "CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ", - (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName); - conn.createStatement().execute(stmtString2); - - // Run the index MR job. - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName); - - String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName); - - // Verify that the index table is in the ACTIVE state - assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName)); - - ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices(); - Admin admin = queryServices.getAdmin(); - TableName tableName = TableName.valueOf(qIndexTableName); - admin.disableTable(tableName); - - // Run the index MR job and it should fail (return -1) - runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, - null, -1, new String[0]); - - // Verify that the index table should be still in the ACTIVE state - assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName)); - } - } - - @Test public void testSaltedVariableLengthPK() throws Exception { if (!mutable) return; if (transactional) return; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java new file mode 100644 index 0000000..477f74f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import com.google.common.collect.Maps; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; + +public class IndexVerificationOldDesignIT extends BaseUniqueNamesOwnClusterIT { + + ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void setup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); + serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5)); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8)); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); + clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); + clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString()); + clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, + Boolean.toString(false)); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Test + public void testIndexToolOnlyVerifyOption() throws Exception { + long ttl=3600; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0, TTL="+ttl); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, dataTableFullName)); + + upsertValidRows(conn, dataTableFullName); + + IndexTool indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.ONLY); + + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(6, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + + conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix5', 6,'G')"); + conn.commit(); + indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.ONLY); + + assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(5, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis() + ttl*1000); + EnvironmentEdgeManager.injectEdge(injectEdge); + + indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName, + null, 0, IndexTool.IndexVerifyType.ONLY); + + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue()); + } + } + + @Test + public void testIndexToolOnlyVerifyOption_viewIndex() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String viewName = generateUniqueName(); + String fullViewName = SchemaUtil.getTableName(schemaName, viewName); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0"); + conn.createStatement().execute("CREATE VIEW " + fullViewName + + " AS SELECT * FROM "+dataTableFullName); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, fullViewName)); + + upsertValidRows(conn, fullViewName); + + IndexToolIT.runIndexTool(true, false, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.ONLY); + + conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix5', 6,'G')"); + conn.createStatement().execute("delete from " + indexTableFullName + " where \"0:CODE\" = 'D'"); + conn.commit(); + + IndexTool indexTool = IndexToolIT.runIndexTool(true, false, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.ONLY); + assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); + assertEquals(4, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue()); + assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue()); + } + } + + private void upsertValidRows(Connection conn, String table) throws SQLException { + conn.createStatement().execute("upsert into " + table + " values (1, 'Phoenix', 'A')"); + conn.createStatement().execute("upsert into " + table + " values (2, 'Phoenix1', 'B')"); + conn.createStatement().execute("upsert into " + table + " values (3, 'Phoenix2', 'C')"); + conn.createStatement().execute("upsert into " + table + " values (4, 'Phoenix3', 'D')"); + conn.createStatement().execute("upsert into " + table + " values (5, 'Phoenix4', 'E')"); + conn.createStatement().execute("upsert into " + table + " values (6, 'Phoenix5', 'F')"); + conn.commit(); + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java new file mode 100644 index 0000000..35a0a8a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.parallel.TaskBatch; +import org.apache.phoenix.hbase.index.parallel.TaskRunner; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; +import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; +import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ServerUtil; + +import java.io.IOException; +import java.util.List; + +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; +import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS; +import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; + +public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { + + public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max"; + public static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17; + public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max"; + public static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048; + public static final String NO_EXPECTED_MUTATION = "No expected mutation"; + public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty"; + public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack"; + public static final String ERROR_MESSAGE_MISSING_INDEX_ROW = "Missing index row"; + + protected long pageSizeInRows = Long.MAX_VALUE; + protected int rowCountPerTask; + protected boolean hasMore; + protected int maxBatchSize; + protected byte[] indexMetaData; + protected Scan scan; + protected RegionScanner innerScanner; + protected Region region; + protected IndexMaintainer indexMaintainer; + protected Table indexHTable; + protected TaskRunner pool; + protected TaskBatch<Boolean> tasks; + protected String exceptionMessage; + protected HTableFactory hTableFactory; + protected int indexTableTTL; + protected long maxLookBackInMills; + protected IndexToolVerificationResult verificationResult; + protected IndexVerificationResultRepository verificationResultRepository; + + public GlobalIndexRegionScanner(RegionScanner innerScanner, final Region region, final Scan scan, + final RegionCoprocessorEnvironment env) throws IOException { + super(innerScanner); + final Configuration config = env.getConfiguration(); + if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) { + pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS); + } + maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); + if (indexMetaData == null) { + indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + } + List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true); + indexMaintainer = maintainers.get(0); + this.scan = scan; + this.innerScanner = innerScanner; + this.region = region; + verificationResult = new IndexToolVerificationResult(scan); + // Create the following objects only for rebuilds by IndexTool + hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION); + indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive(); + maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config); + verificationResultRepository = + new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); + pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( + new ThreadPoolBuilder("IndexVerify", + env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, + DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout( + INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env)); + rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY, + DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK); + } + + public static class SimpleValueGetter implements ValueGetter { + final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); + final Put put; + public SimpleValueGetter (final Put put) { + this.put = put; + } + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { + List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier()); + if (cellList == null || cellList.isEmpty()) { + return null; + } + Cell cell = cellList.get(0); + valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + return valuePtr; + } + + @Override + public byte[] getRowKey() { + return put.getRow(); + } + + } + + public static byte[] getIndexRowKey(IndexMaintainer indexMaintainer, final Put dataRow) { + ValueGetter valueGetter = new SimpleValueGetter(dataRow); + return indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()), + null, null, HConstants.LATEST_TIMESTAMP); + } + + public static long getMaxTimestamp(Mutation m) { + long ts = 0; + for (List<Cell> cells : m.getFamilyCellMap().values()) { + if (cells == null) { + continue; + } + for (Cell cell : cells) { + if (ts < cell.getTimestamp()) { + ts = cell.getTimestamp(); + } + } + } + return ts; + } + + public static long getTimestamp(Mutation m) { + for (List<Cell> cells : m.getFamilyCellMap().values()) { + for (Cell cell : cells) { + return cell.getTimestamp(); + } + } + throw new IllegalStateException("No cell found"); + } + + protected static boolean isTimestampBeforeTTL(long tableTTL, long currentTime, long tsToCheck) { + if (tableTTL == HConstants.FOREVER) { + return false; + } + return tsToCheck < (currentTime - tableTTL * 1000); + } + + protected static boolean isTimestampBeyondMaxLookBack(long maxLookBackInMills, + long currentTime, long tsToCheck) { + if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills)) { + return false; + } + return tsToCheck < (currentTime - maxLookBackInMills); + } + + protected static long getMaxTimestamp(Pair<Put, Delete> pair) { + Put put = pair.getFirst(); + long ts1 = 0; + if (put != null) { + ts1 = getMaxTimestamp(put); + } + Delete del = pair.getSecond(); + long ts2 = 0; + if (del != null) { + ts1 = getMaxTimestamp(del); + } + return (ts1 > ts2) ? ts1 : ts2; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 8b7e3f2..e767a23 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -24,8 +24,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; -import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS; -import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; import java.io.IOException; @@ -54,12 +52,10 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -67,22 +63,18 @@ import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.ValueGetter; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; import org.apache.phoenix.hbase.index.parallel.Task; import org.apache.phoenix.hbase.index.parallel.TaskBatch; -import org.apache.phoenix.hbase.index.parallel.TaskRunner; import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; -import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; -import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.query.KeyRange; @@ -98,86 +90,50 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; import com.google.common.collect.Maps; -public class IndexRebuildRegionScanner extends BaseRegionScanner { - public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack"; +public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class); - public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max"; - private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17; - public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max"; - private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048; - public static final String NO_EXPECTED_MUTATION = "No expected mutation"; - public static final String - ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty"; - private long pageSizeInRows = Long.MAX_VALUE; - private int rowCountPerTask; - private boolean hasMore; - private final int maxBatchSize; - private UngroupedAggregateRegionObserver.MutationList mutations; + private final long maxBatchSizeBytes; private final long blockingMemstoreSize; private final byte[] clientVersionBytes; - private byte[] indexMetaData; private boolean useProto = true; - private Scan scan; - private RegionScanner innerScanner; - private Region region; - private IndexMaintainer indexMaintainer; - private byte[] indexRowKey = null; - private Table indexHTable = null; + private byte[] indexRowKey; private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE; private boolean verify = false; private Map<byte[], List<Mutation>> indexKeyToMutationMap; private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap; - private TaskRunner pool; - private TaskBatch<Boolean> tasks; - private String exceptionMessage; private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver; - private HTableFactory hTableFactory; - private int indexTableTTL = 0; - private IndexToolVerificationResult verificationResult; + protected UngroupedAggregateRegionObserver.MutationList mutations; private boolean isBeforeRebuilt = true; private boolean partialRebuild = false; - private int singleRowRebuildReturnCode; + private int singleRowRebuildReturnCode; private Map<byte[], NavigableSet<byte[]>> familyMap; private byte[][] viewConstants; - private IndexVerificationResultRepository verificationResultRepository; private IndexVerificationOutputRepository verificationOutputRepository; - private long maxLookBackInMills; @VisibleForTesting public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env, UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException { - super(innerScanner); + super(innerScanner, region, scan, env); final Configuration config = env.getConfiguration(); - if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) { - pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS, - QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS); - } else { + if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) == null) { partialRebuild = true; } - maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); - mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize); maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); + mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize); blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config); clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); if (indexMetaData == null) { useProto = false; - indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); } - List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true); - indexMaintainer = maintainers.get(0); - this.scan = scan; familyMap = scan.getFamilyMap(); if (familyMap.isEmpty()) { familyMap = null; } - - this.innerScanner = innerScanner; - this.region = region; this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver; indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY); if (indexRowKey != null) { @@ -186,17 +142,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); if (valueBytes != null) { - verificationResult = new IndexToolVerificationResult(scan); verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes); if (verifyType != IndexTool.IndexVerifyType.NONE) { verify = true; viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); - // Create the following objects only for rebuilds by IndexTool - hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION); - indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); - indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive(); - verificationResultRepository = - new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory); verificationOutputRepository = new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory); indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); @@ -206,12 +155,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout( INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env)); - rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY, - DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK); } } - - maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config); } private void setReturnCodeForSingleRowRebuild() throws IOException { @@ -310,41 +255,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return 0; } - public static class SimpleValueGetter implements ValueGetter { - final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); - final Put put; - - public SimpleValueGetter(final Put put) { - this.put = put; - } - - @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { - List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier()); - if (cellList == null || cellList.isEmpty()) { - return null; - } - Cell cell = cellList.get(0); - valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - return valuePtr; - } - - @Override - public byte[] getRowKey() { - return put.getRow(); - } - - } - - public byte[] getIndexRowKey(final Put dataRow) throws IOException { - ValueGetter valueGetter = new SimpleValueGetter(dataRow); - byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()), - null, null, HConstants.LATEST_TIMESTAMP); - return builtIndexRowKey; - } private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException { - byte[] builtIndexRowKey = getIndexRowKey(put); + byte[] builtIndexRowKey = getIndexRowKey(indexMaintainer, put); if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length, indexRowKey, 0, indexRowKey.length) != 0) { return false; @@ -366,21 +279,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { region.getRegionInfo().getTable().getName(), isBeforeRebuilt); } - private static long getMaxTimestamp(Mutation m) { - long ts = 0; - for (List<Cell> cells : m.getFamilyCellMap().values()) { - if (cells == null) { - continue; - } - for (Cell cell : cells) { - if (ts < cell.getTimestamp()) { - ts = cell.getTimestamp(); - } - } - } - return ts; - } - private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) { List<Cell> cellList = m.getFamilyCellMap().get(family); if (cellList == null) { @@ -529,17 +427,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return false; } - private boolean isDeleteFamilyVersion(Mutation mutation) { - for (List<Cell> cells : mutation.getFamilyCellMap().values()) { - for (Cell cell : cells) { - if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) { - return true; - } - } - } - return false; - } - @VisibleForTesting public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException { Put put = null; @@ -724,7 +611,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { // get a value back from index if it has already expired between our rebuild and // verify // TODO: have a metric to update for these cases - if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) { + if (isTimestampBeforeTTL(indexTableTTL, currentTime, getTimestamp(expected))) { verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1); return true; } @@ -782,7 +669,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return true; } - if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){ + if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){ if (expectedIndex > 0) { // if current expected index mutation is beyond max look back window, we only need to make sure its latest // mutation is a matching one, as an SCN query is required. @@ -822,20 +709,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } } - private static long getMaxTimestamp(Pair<Put, Delete> pair) { - Put put = pair.getFirst(); - long ts1 = 0; - if (put != null) { - ts1 = getMaxTimestamp(put); - } - Delete del = pair.getSecond(); - long ts2 = 0; - if (del != null) { - ts1 = getMaxTimestamp(del); - } - return (ts1 > ts2) ? ts1 : ts2; - } - private void verifyIndexRows(List<KeyRange> keys, IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException { List<KeyRange> invalidKeys = new ArrayList<>(); @@ -847,6 +720,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { indexScan.setFilter(new SkipScanFilter(skipScanFilter, true)); indexScan.setRaw(true); indexScan.setMaxVersions(); + indexScan.setCacheBlocks(false); try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) { for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) { KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow()); @@ -870,7 +744,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { KeyRange keyRange = itr.next(); byte[] key = keyRange.getLowerRange(); List<Mutation> mutationList = indexKeyToMutationMap.get(key); - if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) { + if (isTimestampBeforeTTL(indexTableTTL, currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) { itr.remove(); verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1); } @@ -886,13 +760,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } long currentTime = EnvironmentEdgeManager.currentTimeMillis(); String errorMsg; - if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){ + if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(mutation))){ errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK; verificationPhaseResult. setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1); } else { - errorMsg = "Missing index row"; + errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW; verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1); } byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants); @@ -906,19 +780,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { keys.addAll(invalidKeys); } - private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) { - if (indexTableTTL == HConstants.FOREVER) { - return false; - } - return tsToCheck < (currentTime - (long) indexTableTTL * 1000); - } - - private boolean isTimestampBeyondMaxLookBack(long currentTime, long tsToCheck){ - if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills)) - return true; - return tsToCheck < (currentTime - maxLookBackInMills); - } - private void addVerifyTask(final List<KeyRange> keys, final IndexToolVerificationResult.PhaseResult verificationPhaseResult) { tasks.add(new Task<Boolean>() { @@ -1081,15 +942,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return set.contains(qualifier); } - public static long getTimestamp(Mutation m) { - for (List<Cell> cells : m.getFamilyCellMap().values()) { - for (Cell cell : cells) { - return cell.getTimestamp(); - } - } - throw new IllegalStateException("No cell found"); - } - /** * This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where * put comes before delete @@ -1248,7 +1100,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (mutation.getFamilyCellMap().size() != 0) { // Add this put on top of the current data row state to get the next data row state Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState); - ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow); + ValueGetter nextDataRowVG = new SimpleValueGetter(nextDataRow); Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts); indexMutations.add(indexPut); // Delete the current index row if the new index key is different than the current one @@ -1288,7 +1140,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { currentDataRowState = null; indexRowKeyForCurrentDataRow = null; } else { - ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState); + ValueGetter nextDataRowVG = new SimpleValueGetter(nextDataRowState); Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts); indexMutations.add(indexPut); // Delete the current index row if the new index key is different than the current one diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java new file mode 100644 index 0000000..88bac86 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; +import org.apache.phoenix.hbase.index.parallel.Task; +import org.apache.phoenix.hbase.index.parallel.TaskBatch; +import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; + +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; + +public class IndexerRegionScanner extends GlobalIndexRegionScanner { + + private static final Logger LOGGER = LoggerFactory.getLogger(IndexerRegionScanner.class); + protected Map<byte[], Put> indexKeyToDataPutMap; + + IndexerRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan, + final RegionCoprocessorEnvironment env) throws IOException { + super(innerScanner, region, scan, env); + indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + } + + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { return false; } + + @Override + public void close() throws IOException { + innerScanner.close(); + try { + verificationResultRepository.logToIndexToolResultTable(verificationResult, + IndexTool.IndexVerifyType.ONLY, region.getRegionInfo().getRegionName()); + } finally { + this.pool.stop("IndexerRegionScanner is closing"); + hTableFactory.shutdown(); + indexHTable.close(); + verificationResultRepository.close(); + } + } + + private boolean verifySingleIndexRow(Result indexRow, final Put dataRow, + IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException { + ValueGetter valueGetter = new SimpleValueGetter(dataRow); + long ts = getMaxTimestamp(dataRow); + Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null); + + if (indexPut == null) { + // This means the data row does not have any covered column values + indexPut = new Put(indexRow.getRow()); + } + // Add the empty column + indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + indexMaintainer.getEmptyKeyValueQualifier(), ts, EMPTY_COLUMN_VALUE_BYTES); + + int cellCount = 0; + long currentTime = EnvironmentEdgeManager.currentTime(); + for (List<Cell> cells : indexPut.getFamilyCellMap().values()) { + if (cells == null) { + break; + } + for (Cell expectedCell : cells) { + byte[] family = CellUtil.cloneFamily(expectedCell); + byte[] qualifier = CellUtil.cloneQualifier(expectedCell); + Cell actualCell = indexRow.getColumnLatestCell(family, qualifier); + if (actualCell == null) { + // Check if cell expired as per the current server's time and data table ttl + // Index table should have the same ttl as the data table, hence we might not + // get a value back from index if it has already expired between our rebuild and + // verify + + // or if cell timestamp is beyond maxlookback + if (isTimestampBeforeTTL(indexTableTTL, currentTime, expectedCell.getTimestamp())) { + continue; + } + + return false; + } + if (actualCell.getTimestamp() < ts) { + // Skip older cells since a Phoenix index row is composed of cells with the same timestamp + continue; + } + // Check all columns + if (!CellUtil.matchingValue(actualCell, expectedCell) || actualCell.getTimestamp() != ts) { + if(isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, actualCell.getTimestamp())) { + verificationPhaseResult + .setBeyondMaxLookBackInvalidIndexRowCount(verificationPhaseResult + .getBeyondMaxLookBackInvalidIndexRowCount()+1); + continue; + } + return false; + } + cellCount++; + } + } + return cellCount == indexRow.rawCells().length; + } + + private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap, + IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException { + ScanRanges scanRanges = ScanRanges.createPointLookup(keys); + Scan indexScan = new Scan(); + indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); + scanRanges.initializeScan(indexScan); + SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); + indexScan.setFilter(skipScanFilter); + indexScan.setCacheBlocks(false); + try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) { + for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) { + Put dataPut = indexKeyToDataPutMap.get(result.getRow()); + if (dataPut == null) { + // This should never happen + exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName(); + throw new IOException(exceptionMessage); + } + if (verifySingleIndexRow(result, dataPut, verificationPhaseResult)) { + verificationPhaseResult.setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount()+1); + } else { + verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount()+1); + } + perTaskDataKeyToDataPutMap.remove(dataPut.getRow()); + } + } catch (Throwable t) { + ServerUtil.throwIOException(indexHTable.getName().toString(), t); + } + // Check if any expected rows from index(which we didn't get) are already expired due to TTL + if (!perTaskDataKeyToDataPutMap.isEmpty()) { + Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator(); + long currentTime = EnvironmentEdgeManager.currentTime(); + while(itr.hasNext()) { + Entry<byte[], Put> entry = itr.next(); + long ts = getMaxTimestamp(entry.getValue()); + if (isTimestampBeforeTTL(indexTableTTL, currentTime, ts)) { + itr.remove(); + verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount()+1); + } + } + } + // Check if any expected rows from index(which we didn't get) are beyond max look back and have been compacted away + if (!perTaskDataKeyToDataPutMap.isEmpty()) { + for (Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) { + Put put = entry.getValue(); + long ts = getMaxTimestamp(put); + long currentTime = EnvironmentEdgeManager.currentTime(); + if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, ts)) { + verificationPhaseResult. + setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1); + } else { + verificationPhaseResult.setMissingIndexRowCount( + verificationPhaseResult.getMissingIndexRowCount() + 1); + } + } + } + } + + private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap, + final IndexToolVerificationResult.PhaseResult verificationPhaseResult) { + tasks.add(new Task<Boolean>() { + @Override + public Boolean call() throws Exception { + try { + if (Thread.currentThread().isInterrupted()) { + exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName(); + throw new IOException(exceptionMessage); + } + verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult); + } catch (Exception e) { + throw e; + } + return Boolean.TRUE; + } + }); + } + + private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException { + int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask; + tasks = new TaskBatch<>(taskCount); + + List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount); + List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount); + List<KeyRange> keys = new ArrayList<>(rowCountPerTask); + + Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + + dataPutMapList.add(perTaskDataKeyToDataPutMap); + + IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); + verificationPhaseResultList.add(perTaskVerificationPhaseResult); + + for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey())); + perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue()); + if (keys.size() == rowCountPerTask) { + addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult); + keys = new ArrayList<>(rowCountPerTask); + perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + dataPutMapList.add(perTaskDataKeyToDataPutMap); + perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); + verificationPhaseResultList.add(perTaskVerificationPhaseResult); + } + } + if (keys.size() > 0) { + addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult); + } + List<Boolean> taskResultList = null; + try { + LOGGER.debug("Waiting on index verify tasks to complete..."); + taskResultList = this.pool.submitUninterruptible(tasks); + } catch (ExecutionException e) { + throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e); + } catch (EarlyExitFailure e) { + throw new RuntimeException("Stopped while waiting for batch, quitting!", e); + } + for (Boolean result : taskResultList) { + if (result == null) { + // there was a failure + throw new IOException(exceptionMessage); + } + } + for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) { + verificationPhaseResult.add(result); + } + } + + private void verifyIndex() throws IOException { + IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(scan); + nextVerificationResult.setScannedDataRowCount(indexKeyToDataPutMap.size()); + IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); + // For these options we start with verifying index rows + parallelizeIndexVerify(verificationPhaseResult); + nextVerificationResult.getBefore().add(verificationPhaseResult); + indexKeyToDataPutMap.clear(); + verificationResult.add(nextVerificationResult); + } + + @Override + public boolean next(List<Cell> results) throws IOException { + Cell lastCell = null; + int rowCount = 0; + region.startRegionOperation(); + try { + synchronized (innerScanner) { + do { + List<Cell> row = new ArrayList<>(); + hasMore = innerScanner.nextRaw(row); + if (!row.isEmpty()) { + lastCell = row.get(0); + Put put = null; + for (Cell cell : row) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + } + put.add(cell); + } else { + throw new DoNotRetryIOException("Scan without raw found a deleted cell"); + } + } + rowCount++; + indexKeyToDataPutMap + .put(getIndexRowKey(indexMaintainer, put), put); + } + } while (hasMore && rowCount < pageSizeInRows); + verifyIndex(); + } + } catch (IOException e) { + LOGGER.error(String.format("IOException during rebuilding: %s", Throwables.getStackTraceAsString(e))); + throw e; + } finally { + region.closeRegionOperation(); + indexKeyToDataPutMap.clear(); + } + byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); + final Cell aggKeyValue; + if (lastCell == null) { + aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes,0, rowCountBytes.length); + } else { + aggKeyValue = KeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + } + results.add(aggKeyValue); + return hasMore; + } + + @Override + public long getMaxResultSize() { + return scan.getMaxResultSize(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b9cdafd..2c93b3f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -87,6 +87,7 @@ import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.expression.aggregator.Aggregators; import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.exception.IndexWriteException; @@ -101,6 +102,7 @@ import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -120,7 +122,7 @@ import org.apache.phoenix.schema.stats.NoOpStatisticsCollector; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.stats.StatisticsCollectorFactory; -import org.apache.phoenix.schema.stats.StatisticsScanner; + import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException; import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; @@ -389,8 +391,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException { - RegionCoprocessorEnvironment env = c.getEnvironment(); - Region region = env.getRegion(); + final RegionCoprocessorEnvironment env = c.getEnvironment(); + final Region region = env.getRegion(); long ts = scan.getTimeRange().getMax(); boolean localIndexScan = ScanUtil.isLocalIndex(scan); if (ScanUtil.isAnalyzeTable(scan)) { @@ -408,7 +410,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return collectStats(s, statsCollector, region, scan, env.getConfiguration()); } } else if (ScanUtil.isIndexRebuild(scan)) { - return rebuildIndices(s, region, scan, env); + return User.runAsLoginUser(new PrivilegedExceptionAction<RegionScanner>() { + @Override + public RegionScanner run() throws Exception { + return rebuildIndices(s, region, scan, env); + } + }); } PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); @@ -1062,12 +1069,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env) throws IOException { + boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName()); + byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); + IndexTool.IndexVerifyType verifyType = (valueBytes != null) ? + IndexTool.IndexVerifyType.fromValue(valueBytes):IndexTool.IndexVerifyType.NONE; + if(oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) { + return new IndexerRegionScanner(innerScanner, region, scan, env); + } if (!scan.isRaw()) { Scan rawScan = new Scan(scan); rawScan.setRaw(true); rawScan.setMaxVersions(); rawScan.getFamilyMap().clear(); rawScan.setFilter(null); + rawScan.setCacheBlocks(false); for (byte[] family : scan.getFamilyMap().keySet()) { rawScan.addFamily(family); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 6246697..40fc883 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -27,7 +27,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.ArrayListMultimap; @@ -66,6 +65,7 @@ import org.apache.htrace.TraceScope; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.LockManager.RowLock; @@ -78,7 +78,6 @@ import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter; import org.apache.phoenix.index.IndexMaintainer; @@ -88,7 +87,6 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ServerUtil.ConnectionType; @@ -648,7 +646,7 @@ public class IndexRegionObserver extends BaseRegionObserver { IndexMaintainer indexMaintainer = pair.getFirst(); HTableInterfaceReference hTableInterfaceReference = pair.getSecond(); if (nextDataRowState != null) { - ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState); + ValueGetter nextDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(nextDataRowState); Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, nextDataRowVG, rowKeyPtr, ts, null, null); if (indexPut == null) { @@ -666,7 +664,7 @@ public class IndexRegionObserver extends BaseRegionObserver { new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get())); // Delete the current index row if the new index key is different than the current one if (currentDataRowState != null) { - ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState); + ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState); byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP); if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) { @@ -677,7 +675,7 @@ public class IndexRegionObserver extends BaseRegionObserver { } } } else if (currentDataRowState != null) { - ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState); + ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState); byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP); Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java index b736787..4baad43 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java @@ -35,5 +35,5 @@ public enum PhoenixIndexToolJobCounters { AFTER_REBUILD_MISSING_INDEX_ROW_COUNT, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT, - AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT + AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java index 5df041d..868e6ad 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; import org.apache.phoenix.coprocessor.IndexToolVerificationResult; import org.apache.phoenix.hbase.index.IndexRegionObserver; @@ -269,7 +270,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { private void initializeGlobalMockitoSetup() throws IOException { //setup - when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod(); + when(GlobalIndexRegionScanner.getIndexRowKey(indexMaintainer, put)).thenCallRealMethod(); when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod(); when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(), Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod();