This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push: new 1cccd89 PHOENIX-5674 IndexTool to not write already correct index rows/CFs 1cccd89 is described below commit 1cccd89fe36e05e08ca2b5058f4d46373c69b327 Author: Kadir <kozde...@salesforce.com> AuthorDate: Tue Jan 14 15:47:33 2020 -0800 PHOENIX-5674 IndexTool to not write already correct index rows/CFs --- .../org/apache/phoenix/end2end/IndexToolIT.java | 80 +++++- .../coprocessor/IndexRebuildRegionScanner.java | 280 +++++++++++++-------- .../apache/phoenix/mapreduce/index/IndexTool.java | 15 +- 3 files changed, 262 insertions(+), 113 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 2eeeb4a..ce24e6d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -37,26 +37,29 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; -import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; @@ -378,6 +381,24 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } } + public static class MutationCountingRegionObserver extends SimpleRegionObserver { + public static AtomicInteger mutationCount = new AtomicInteger(0); + + public static void setMutationCount(int value) { + mutationCount.set(0); + } + + public static int getMutationCount() { + return mutationCount.get(); + } + + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { + mutationCount.addAndGet(miniBatchOp.size()); + } + } + private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName) throws Exception { byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); @@ -416,6 +437,53 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } @Test + public void testIndexToolVerifyBeforeAndBothOptions() throws Exception { + // This test is for building non-transactional global indexes with direct api + if (localIndex || transactional || !directApi || useSnapshot) { + return; + } + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String viewName = generateUniqueName(); + String viewFullName = SchemaUtil.getTableName(schemaName, viewName); + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) " + + tableDDLOptions); + conn.commit(); + conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName); + conn.commit(); + // Insert a row + conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)"); + conn.commit(); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName)); + TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, MutationCountingRegionObserver.class); + // Run the index MR job and verify that the index table rebuild succeeds + runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.AFTER); + assertEquals(1, MutationCountingRegionObserver.getMutationCount()); + MutationCountingRegionObserver.setMutationCount(0); + // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should + // write any index rows + runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.BEFORE); + assertEquals(0, MutationCountingRegionObserver.getMutationCount()); + // The "-v BOTH" option should not write any index rows either + runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName, + null, 0, IndexTool.IndexVerifyType.BOTH); + assertEquals(0, MutationCountingRegionObserver.getMutationCount()); + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES); + admin.disableTable(indexToolOutputTable); + admin.deleteTable(indexToolOutputTable); + } + } + + @Test public void testIndexToolVerifyAfterOption() throws Exception { // This test is for building non-transactional global indexes with direct api if (localIndex || transactional || !directApi || useSnapshot) { @@ -448,7 +516,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { null, -1, IndexTool.IndexVerifyType.AFTER); // The index tool output table should report that there is a missing index row Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName); - byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0"); + byte[] expectedValueBytes = Bytes.toBytes("Missing index row"); assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), expectedValueBytes, 0, expectedValueBytes.length) == 0); IndexRegionObserver.setIgnoreIndexRebuildForTesting(false); @@ -485,7 +553,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY); Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName); - byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0"); + byte[] expectedValueBytes = Bytes.toBytes("Missing index row"); assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), expectedValueBytes, 0, expectedValueBytes.length) == 0); // Delete the output table for the next test diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 142871b..b62b215 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -110,8 +110,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private Table outputHTable = null; private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE; private boolean verify = false; - private boolean onlyVerify = false; + private boolean doNotFail = false; private Map<byte[], Put> indexKeyToDataPutMap; + private Map<byte[], Put> dataKeyToDataPutMap; private TaskRunner pool; private TaskBatch<Boolean> tasks; private String exceptionMessage; @@ -140,6 +141,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); } if (!scan.isRaw()) { + // No need to deserialize index maintainers when the scan is raw. Raw scan is used by partial rebuilds List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true); indexMaintainer = maintainers.get(0); } @@ -152,15 +154,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); if (valueBytes != null) { verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes); - if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.ONLY) { + if (verifyType != IndexTool.IndexVerifyType.NONE) { verify = true; - if (verifyType == IndexTool.IndexVerifyType.ONLY) { - onlyVerify = true; - } + // Create the following objects only for rebuilds by IndexTool hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION); indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES)); indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor( new ThreadPoolBuilder("IndexVerify", env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY, @@ -201,28 +202,31 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { m.setDurability(Durability.SKIP_WAL); } - private Delete generateDeleteMarkers(List<Cell> row) { + private Delete generateDeleteMarkers(Put put) { Set<ColumnReference> allColumns = indexMaintainer.getAllColumns(); - if (row.size() == allColumns.size() + 1) { + int cellCount = put.size(); + if (cellCount == allColumns.size() + 1) { // We have all the columns for the index table plus the empty column. So, no delete marker is needed return null; } - Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size()); + Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount); long ts = 0; - for (Cell cell : row) { - includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))); - if (ts < cell.getTimestamp()) { - ts = cell.getTimestamp(); + for (List<Cell> cells : put.getFamilyCellMap().values()) { + if (cells == null) { + break; + } + for (Cell cell : cells) { + includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))); + if (ts < cell.getTimestamp()) { + ts = cell.getTimestamp(); + } } } - byte[] rowKey; Delete del = null; for (ColumnReference column : allColumns) { if (!includedColumns.contains(column)) { if (del == null) { - Cell cell = row.get(0); - rowKey = CellUtil.cloneRow(cell); - del = new Delete(rowKey); + del = new Delete(put.getRow()); } del.addColumns(column.getFamily(), column.getQualifier(), ts); } @@ -238,15 +242,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } } - private byte[] commitIfReady(byte[] uuidValue) throws IOException { - if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { + private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException { + if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) { ungroupedAggregateRegionObserver.checkForRegionClosing(); - ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize); + ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutationList, blockingMemstoreSize); uuidValue = ServerCacheClient.generateId(); - if (verify) { - addToBeVerifiedIndexRows(); - } - mutations.clear(); + mutationList.clear(); } return uuidValue; } @@ -367,10 +368,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return ts; } - private void verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException { - ValueGetter valueGetter = new SimpleValueGetter(dataRow); + private long getMaxTimestamp(Put put) { long ts = 0; - for (List<Cell> cells : dataRow.getFamilyCellMap().values()) { + for (List<Cell> cells : put.getFamilyCellMap().values()) { if (cells == null) { break; } @@ -380,19 +380,25 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } } } + return ts; + } + + private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException { + ValueGetter valueGetter = new SimpleValueGetter(dataRow); + long ts = getMaxTimestamp(dataRow); Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null); if (indexPut == null) { // This means the index row does not have any covered columns. We just need to check if the index row // has only one cell (which is the empty column cell) if (indexRow.rawCells().length == 1) { - return; + return true; } String errorMsg = "Expected to find only empty column cell but got " + indexRow.rawCells().length; logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg); - if (onlyVerify) { - return; + if (doNotFail) { + return false; } exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName(); throw new IOException(exceptionMessage); @@ -418,8 +424,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { String errorMsg = " Missing cell " + Bytes.toString(family) + ":" + Bytes.toString(qualifier); logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg); - if (onlyVerify) { - return; + if (doNotFail) { + return false; } exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName(); throw new IOException(exceptionMessage); @@ -430,8 +436,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { Bytes.toString(qualifier); logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell)); - if (onlyVerify) { - return; + if (doNotFail) { + return false; } exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName(); throw new IOException(exceptionMessage); @@ -443,14 +449,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { String errorMsg = "Expected to find " + cellCount + " cells but got " + indexRow.rawCells().length + " cells"; logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg); - if (!onlyVerify) { + if (!doNotFail) { exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName(); throw new IOException(exceptionMessage); } + return false; } + return true; } - private void verifyIndexRows(ArrayList<KeyRange> keys) throws IOException { + private void verifyIndexRows(ArrayList<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap) throws IOException { int expectedRowCount = keys.size(); ScanRanges scanRanges = ScanRanges.createPointLookup(keys); Scan indexScan = new Scan(); @@ -463,31 +471,57 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) { Put dataPut = indexKeyToDataPutMap.get(result.getRow()); if (dataPut == null) { - exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName(); String errorMsg = "Missing data row"; logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg); - if (!onlyVerify) { + if (!doNotFail) { + exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName(); throw new IOException(exceptionMessage); } } - verifySingleIndexRow(result, dataPut); + if (verifySingleIndexRow(result, dataPut)) { + perTaskDataKeyToDataPutMap.remove(dataPut.getRow()); + } rowCount++; } } catch (Throwable t) { ServerUtil.throwIOException(indexHTable.getName().toString(), t); } if (rowCount != expectedRowCount) { - String errorMsg = "Missing index rows - Expected: " + expectedRowCount + - " Actual: " + rowCount; + for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) { + String errorMsg = "Missing index row"; + logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()), + 0, errorMsg); + if (!doNotFail) { exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName(); - logToIndexToolOutputTable(null, null, 0, 0, errorMsg); - if (!onlyVerify) { - throw new IOException(exceptionMessage); + throw new IOException(exceptionMessage); + } + } + } + } + + private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException { + byte[] uuidValue = ServerCacheClient.generateId(); + UngroupedAggregateRegionObserver.MutationList currentMutationList = + new UngroupedAggregateRegionObserver.MutationList(maxBatchSize); + for (Mutation mutation : mutationList) { + Put put = (Put) mutation; + currentMutationList.add(mutation); + setMutationAttributes(put, uuidValue); + uuidValue = commitIfReady(uuidValue, currentMutationList); + Delete deleteMarkers = generateDeleteMarkers(put); + if (deleteMarkers != null) { + setMutationAttributes(deleteMarkers, uuidValue); + currentMutationList.add(deleteMarkers); + uuidValue = commitIfReady(uuidValue, currentMutationList); } } + if (!currentMutationList.isEmpty()) { + ungroupedAggregateRegionObserver.checkForRegionClosing(); + ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize); + } } - private void addVerifyTask(final ArrayList<KeyRange> keys) { + private void addVerifyTask(final ArrayList<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap) { tasks.add(new Task<Boolean>() { @Override public Boolean call() throws Exception { @@ -496,7 +530,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName(); throw new IOException(exceptionMessage); } - verifyIndexRows(keys); + verifyIndexRows(keys, perTaskDataKeyToDataPutMap); + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { + synchronized (dataKeyToDataPutMap) { + dataKeyToDataPutMap.putAll(perTaskDataKeyToDataPutMap); + } + } + perTaskDataKeyToDataPutMap.clear(); } catch (Exception e) { throw e; } @@ -505,11 +545,81 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { }); } + private void parallelizeIndexVerify() throws IOException { + addToBeVerifiedIndexRows(); + ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask); + Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey())); + perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue()); + if (keys.size() == rowCountPerTask) { + addVerifyTask(keys, perTaskDataKeyToDataPutMap); + keys = new ArrayList<>(rowCountPerTask); + perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + } + } + if (keys.size() > 0) { + addVerifyTask(keys, perTaskDataKeyToDataPutMap); + } + List<Boolean> taskResultList = null; + try { + LOGGER.debug("Waiting on index verify tasks to complete..."); + taskResultList = this.pool.submitUninterruptible(tasks); + } catch (ExecutionException e) { + throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e); + } catch (EarlyExitFailure e) { + throw new RuntimeException("Stopped while waiting for batch, quitting!", e); + } finally { + tasks.getTasks().clear(); + } + for (Boolean result : taskResultList) { + if (result == null) { + // there was a failure + throw new IOException(exceptionMessage); + } + } + } + + private void verifyAndOrRebuildIndex() throws IOException { + if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) { + // For these options we start with rebuilding index rows + rebuildIndexRows(mutations); + } + if (verifyType == IndexTool.IndexVerifyType.NONE) { + return; + } + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH || + verifyType == IndexTool.IndexVerifyType.ONLY) { + // For these options we start with verifying index rows + doNotFail = true; // Don't stop at the first mismatch + parallelizeIndexVerify(); + } + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { + // For these options, we have identified the rows to be rebuilt and now need to rebuild them + // At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt + mutations.clear(); + for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) { + mutations.add(entry.getValue()); + } + rebuildIndexRows(mutations); + } + + if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { + // We have rebuilt index row and now we need to verify them + doNotFail = false; // Stop at the first mismatch + indexKeyToDataPutMap.clear(); + parallelizeIndexVerify(); + } + indexKeyToDataPutMap.clear(); + } + @Override public boolean next(List<Cell> results) throws IOException { int rowCount = 0; region.startRegionOperation(); try { + // Partial rebuilds by MetadataRegionObserver use raw scan. Inline verification is not supported for them + boolean partialRebuild = scan.isRaw(); byte[] uuidValue = ServerCacheClient.generateId(); synchronized (innerScanner) { do { @@ -522,55 +632,58 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { if (put == null) { put = new Put(CellUtil.cloneRow(cell)); - setMutationAttributes(put, uuidValue); mutations.add(put); } put.add(cell); } else { if (del == null) { del = new Delete(CellUtil.cloneRow(cell)); - setMutationAttributes(del, uuidValue); mutations.add(del); } del.addDeleteMarker(cell); } } - if (onlyVerify) { - rowCount++; - continue; + if (partialRebuild) { + if (put != null) { + setMutationAttributes(put, uuidValue); + } + if (del != null) { + setMutationAttributes(del, uuidValue); + } + uuidValue = commitIfReady(uuidValue, mutations); } - uuidValue = commitIfReady(uuidValue); - if (!scan.isRaw()) { - Delete deleteMarkers = generateDeleteMarkers(row); + if (indexRowKey != null) { + if (put != null) { + setMutationAttributes(put, uuidValue); + } + Delete deleteMarkers = generateDeleteMarkers(put); if (deleteMarkers != null) { setMutationAttributes(deleteMarkers, uuidValue); mutations.add(deleteMarkers); - uuidValue = commitIfReady(uuidValue); + uuidValue = commitIfReady(uuidValue, mutations); } - } - if (indexRowKey != null) { // GlobalIndexChecker passed the index row key. This is to build a single index row. // Check if the data table row we have just scanned matches with the index row key. // If not, there is no need to build the index row from this data table row, // and just return zero row count. if (checkIndexRow(indexRowKey, put)) { rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue(); - } - else { + } else { rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue(); } break; } rowCount++; } - } while (hasMore && rowCount < pageSizeInRows); - if (!mutations.isEmpty() && !onlyVerify) { + } + if (!partialRebuild && indexRowKey == null) { + verifyAndOrRebuildIndex(); + } + else { + if (!mutations.isEmpty()) { ungroupedAggregateRegionObserver.checkForRegionClosing(); ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize); - if (verify) { - addToBeVerifiedIndexRows(); - } } } } catch (IOException e) { @@ -579,42 +692,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { throw e; } finally { region.closeRegionOperation(); - } - if (verify) { - if (onlyVerify) { - addToBeVerifiedIndexRows(); - } - ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask); - for (byte[] key : indexKeyToDataPutMap.keySet()) { - keys.add(PVarbinary.INSTANCE.getKeyRange(key)); - if (keys.size() == rowCountPerTask) { - addVerifyTask(keys); - keys = new ArrayList<>(rowCountPerTask); - } - } - if (keys.size() > 0) { - addVerifyTask(keys); - } - List<Boolean> taskResultList = null; - try { - LOGGER.debug("Waiting on index verify tasks to complete..."); - taskResultList = this.pool.submitUninterruptible(tasks); - } catch (ExecutionException e) { - throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e); - } catch (EarlyExitFailure e) { - throw new RuntimeException("Stopped while waiting for batch, quitting!", e); - } - finally { - indexKeyToDataPutMap.clear(); - tasks.getTasks().clear(); - } - for (Boolean result : taskResultList) { - if (result == null) { - // there was a failure - throw new IOException(exceptionMessage); - } + mutations.clear(); + if (verify) { + indexKeyToDataPutMap.clear(); + dataKeyToDataPutMap.clear(); } } + byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); @@ -626,4 +710,4 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { public long getMaxResultSize() { return scan.getMaxResultSize(); } -} \ No newline at end of file +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 6b77ed8..51cb4b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -203,11 +203,12 @@ public class IndexTool extends Configured implements Tool { "This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility."); private static final Option VERIFY_OPTION = new Option("v", "verify", true, - "To verify every data row has a corresponding row. The accepted values are NONE, ONLY, BEFORE," + - " AFTER, and BOTH. NONE is for no inline verification, which is also the default for this option. " + - "ONLY is for verifying without rebuilding index rows. The rest for verifying before, after, and " + - "both before and after rebuilding row. If the verification is done before rebuilding rows and " + - "the correct index rows are not rebuilt. Currently supported values are NONE, ONLY and AFTER "); + "To verify every data row has a corresponding row of a global index. For other types of indexes, " + + "this option will be silently ignored. The accepted values are NONE, ONLY, BEFORE, AFTER, and BOTH. " + + "NONE is for no inline verification, which is also the default for this option. ONLY is for " + + "verifying without rebuilding index rows. The rest for verifying before, after, and both before " + + "and after rebuilding row. If the verification is done before rebuilding rows and the correct " + + "index rows will not be rebuilt"); private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0; @@ -677,10 +678,6 @@ public class IndexTool extends Configured implements Tool { if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) { String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt()); indexVerifyType = IndexVerifyType.fromValue(value); - if (!(indexVerifyType == IndexVerifyType.NONE || indexVerifyType == IndexVerifyType.AFTER || - indexVerifyType == IndexVerifyType.ONLY)) { - throw new IllegalStateException("Unsupported value for the verify option"); - } } qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {