This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 52e5e2a  PHOENIX-5674 IndexTool to not write already correct index 
rows/CFs
52e5e2a is described below

commit 52e5e2ae320be9be649188b0208d3b05cbe7bedc
Author: Kadir <kozde...@salesforce.com>
AuthorDate: Tue Jan 14 15:47:33 2020 -0800

    PHOENIX-5674 IndexTool to not write already correct index rows/CFs
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  78 +++++-
 .../coprocessor/IndexRebuildRegionScanner.java     | 278 ++++++++++++++-------
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  15 +-
 3 files changed, 263 insertions(+), 108 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index d5713b6..8d42020 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -37,17 +38,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
@@ -371,6 +380,24 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    public static class MutationCountingRegionObserver extends 
SimpleRegionObserver {
+        public static AtomicInteger mutationCount = new AtomicInteger(0);
+
+        public static void setMutationCount(int value) {
+            mutationCount.set(0);
+        }
+
+        public static int getMutationCount() {
+            return mutationCount.get();
+        }
+
+        @Override
+        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+                                   MiniBatchOperationInProgress<Mutation> 
miniBatchOp) throws HBaseIOException {
+            mutationCount.addAndGet(miniBatchOp.size());
+        }
+    }
+
     private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, 
String dataTableFullName, String indexTableFullName)
             throws Exception {
         byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
@@ -409,6 +436,53 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
+        // This test is for building non-transactional global indexes with 
direct api
+        if (localIndex || transactional || !directApi || useSnapshot) {
+            return;
+        }
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String schemaName = generateUniqueName();
+            String dataTableName = generateUniqueName();
+            String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+            String indexTableName = generateUniqueName();
+            String viewName = generateUniqueName();
+            String viewFullName = SchemaUtil.getTableName(schemaName, 
viewName);
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP 
INTEGER) "
+                    + tableDDLOptions);
+            conn.commit();
+            conn.createStatement().execute("CREATE VIEW " + viewFullName + " 
AS SELECT * FROM " + dataTableFullName);
+            conn.commit();
+            // Insert a row
+            conn.createStatement().execute("upsert into " + viewFullName + " 
values (1, 'Phoenix', 12345)");
+            conn.commit();
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", 
indexTableName, viewFullName));
+            TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, 
MutationCountingRegionObserver.class);
+            // Run the index MR job and verify that the index table rebuild 
succeeds
+            runIndexTool(directApi, useSnapshot, schemaName, viewName, 
indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.AFTER);
+            assertEquals(1, MutationCountingRegionObserver.getMutationCount());
+            MutationCountingRegionObserver.setMutationCount(0);
+            // Since all the rows are in the index table, running the index 
tool with the "-v BEFORE" option should
+            // write any index rows
+            runIndexTool(directApi, useSnapshot, schemaName, viewName, 
indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.BEFORE);
+            assertEquals(0, MutationCountingRegionObserver.getMutationCount());
+            // The "-v BOTH" option should not write any index rows either
+            runIndexTool(directApi, useSnapshot, schemaName, viewName, 
indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.BOTH);
+            assertEquals(0, MutationCountingRegionObserver.getMutationCount());
+            Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName indexToolOutputTable = 
TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+        }
+    }
+
+    @Test
     public void testIndexToolVerifyAfterOption() throws Exception {
         // This test is for building non-transactional global indexes with 
direct api
         if (localIndex || transactional || !directApi || useSnapshot) {
@@ -441,7 +515,7 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
                     null, -1, IndexTool.IndexVerifyType.AFTER);
             // The index tool output table should report that there is a 
missing index row
             Cell cell = getErrorMessageFromIndexToolOutputTable(conn, 
dataTableFullName, "_IDX_" + dataTableFullName);
-            byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - 
Expected: 1 Actual: 0");
+            byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
             assertTrue(Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(),
                     expectedValueBytes, 0, expectedValueBytes.length) == 0);
             IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
@@ -478,7 +552,7 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, 
indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY);
             Cell cell = getErrorMessageFromIndexToolOutputTable(conn, 
dataTableFullName, indexTableFullName);
-            byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - 
Expected: 1 Actual: 0");
+            byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
             assertTrue(Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(),
                     expectedValueBytes, 0, expectedValueBytes.length) == 0);
             // Delete the output table for the next test
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 60271ef..526748e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -109,8 +109,9 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
     private Table outputHTable = null;
     private IndexTool.IndexVerifyType verifyType = 
IndexTool.IndexVerifyType.NONE;
     private boolean verify = false;
-    private boolean onlyVerify = false;
+    private boolean doNotFail = false;
     private Map<byte[], Put> indexKeyToDataPutMap;
+    private Map<byte[], Put> dataKeyToDataPutMap;
     private TaskRunner pool;
     private TaskBatch<Boolean> tasks;
     private String exceptionMessage;
@@ -138,6 +139,7 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
         if (!scan.isRaw()) {
+            // No need to deserialize index maintainers when the scan is raw. 
Raw scan is used by partial rebuilds
             List<IndexMaintainer> maintainers = 
IndexMaintainer.deserialize(indexMetaData, true);
             indexMaintainer = maintainers.get(0);
         }
@@ -150,16 +152,15 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         byte[] valueBytes = 
scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
-            if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == 
IndexTool.IndexVerifyType.ONLY) {
+            if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
-                if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-                    onlyVerify = true;
-                }
+                // Create the following objects only for rebuilds by IndexTool
                 indexHTable = 
ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
                         
env).getTable(TableName.valueOf(indexMaintainer.getIndexTableName()));
                 outputHTable = 
ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
                         
env).getTable(TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES));
                 indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 pool = new 
WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
                         new ThreadPoolBuilder("IndexVerify",
                                 
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
@@ -200,28 +201,31 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         m.setDurability(Durability.SKIP_WAL);
     }
 
-    private Delete generateDeleteMarkers(List<Cell> row) {
+    private Delete generateDeleteMarkers(Put put) {
         Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
-        if (row.size() == allColumns.size() + 1) {
+        int cellCount = put.size();
+        if (cellCount == allColumns.size() + 1) {
             // We have all the columns for the index table plus the empty 
column. So, no delete marker is needed
             return null;
         }
-        Set<ColumnReference> includedColumns = 
Sets.newLinkedHashSetWithExpectedSize(row.size());
+        Set<ColumnReference> includedColumns = 
Sets.newLinkedHashSetWithExpectedSize(cellCount);
         long ts = 0;
-        for (Cell cell : row) {
-            includedColumns.add(new 
ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
-            if (ts < cell.getTimestamp()) {
-                ts = cell.getTimestamp();
+        for (List<Cell> cells : put.getFamilyCellMap().values()) {
+            if (cells == null) {
+                break;
+            }
+            for (Cell cell : cells) {
+                includedColumns.add(new 
ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
+                if (ts < cell.getTimestamp()) {
+                    ts = cell.getTimestamp();
+                }
             }
         }
-        byte[] rowKey;
         Delete del = null;
         for (ColumnReference column : allColumns) {
             if (!includedColumns.contains(column)) {
                 if (del == null) {
-                    Cell cell = row.get(0);
-                    rowKey = CellUtil.cloneRow(cell);
-                    del = new Delete(rowKey);
+                    del = new Delete(put.getRow());
                 }
                 del.addColumns(column.getFamily(), column.getQualifier(), ts);
             }
@@ -237,15 +241,12 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         }
     }
 
-    private byte[] commitIfReady(byte[] uuidValue) throws IOException {
-        if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), 
maxBatchSize, maxBatchSizeBytes)) {
+    private byte[] commitIfReady(byte[] uuidValue, 
UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+        if (ServerUtil.readyToCommit(mutationList.size(), 
mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
             
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
-            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, 
mutations, blockingMemstoreSize);
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, 
mutationList, blockingMemstoreSize);
             uuidValue = ServerCacheClient.generateId();
-            if (verify) {
-                addToBeVerifiedIndexRows();
-            }
-            mutations.clear();
+            mutationList.clear();
         }
         return uuidValue;
     }
@@ -366,10 +367,9 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         return ts;
     }
 
-    private void verifySingleIndexRow(Result indexRow, final Put dataRow) 
throws IOException {
-        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+    private long getMaxTimestamp(Put put) {
         long ts = 0;
-        for (List<Cell> cells : dataRow.getFamilyCellMap().values()) {
+        for (List<Cell> cells : put.getFamilyCellMap().values()) {
             if (cells == null) {
                 break;
             }
@@ -379,19 +379,25 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                 }
             }
         }
+        return ts;
+    }
+
+    private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) 
throws IOException {
+        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+        long ts = getMaxTimestamp(dataRow);
         Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
                 valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, 
null, null);
         if (indexPut == null) {
             // This means the index row does not have any covered columns. We 
just need to check if the index row
             // has only one cell (which is the empty column cell)
             if (indexRow.rawCells().length == 1) {
-                return;
+                return true;
             }
             String errorMsg = "Expected to find only empty column cell but got 
"
                     + indexRow.rawCells().length;
             logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, 
getMaxTimestamp(indexRow), errorMsg);
-            if (onlyVerify) {
-                return;
+            if (doNotFail) {
+                return false;
             }
             exceptionMessage = "Index verify failed - " + errorMsg + 
indexHTable.getName();
             throw new IOException(exceptionMessage);
@@ -417,8 +423,8 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                     String errorMsg = " Missing cell " + 
Bytes.toString(family) + ":" +
                             Bytes.toString(qualifier);
                     logToIndexToolOutputTable(dataRow.getRow(), 
indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
-                    if (onlyVerify) {
-                        return;
+                    if (doNotFail) {
+                        return false;
                     }
                     exceptionMessage = "Index verify failed - Missing cell " + 
indexHTable.getName();
                     throw new IOException(exceptionMessage);
@@ -429,8 +435,8 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                             Bytes.toString(qualifier);
                     logToIndexToolOutputTable(dataRow.getRow(), 
indexRow.getRow(), ts, getMaxTimestamp(indexRow),
                             errorMsg, CellUtil.cloneValue(expectedCell), 
CellUtil.cloneValue(actualCell));
-                    if (onlyVerify) {
-                        return;
+                    if (doNotFail) {
+                        return false;
                     }
                     exceptionMessage = "Index verify failed - Not matching 
cell value - " + indexHTable.getName();
                     throw new IOException(exceptionMessage);
@@ -442,14 +448,16 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             String errorMsg = "Expected to find " + cellCount + " cells but 
got "
                     + indexRow.rawCells().length + " cells";
             logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, 
getMaxTimestamp(indexRow), errorMsg);
-            if (!onlyVerify) {
+            if (!doNotFail) {
                 exceptionMessage = "Index verify failed - " + errorMsg + " - " 
+ indexHTable.getName();
                 throw new IOException(exceptionMessage);
             }
+            return false;
         }
+        return true;
     }
 
-    private void verifyIndexRows(ArrayList<KeyRange> keys) throws IOException {
+    private void verifyIndexRows(ArrayList<KeyRange> keys, Map<byte[], Put> 
perTaskDataKeyToDataPutMap) throws IOException {
         int expectedRowCount = keys.size();
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         Scan indexScan = new Scan();
@@ -462,31 +470,57 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             for (Result result = resultScanner.next(); (result != null); 
result = resultScanner.next()) {
                 Put dataPut = indexKeyToDataPutMap.get(result.getRow());
                 if (dataPut == null) {
-                    exceptionMessage = "Index verify failed - Missing data row 
- " + indexHTable.getName();
                     String errorMsg = "Missing data row";
                     logToIndexToolOutputTable(null, result.getRow(), 0, 
getMaxTimestamp(result), errorMsg);
-                    if (!onlyVerify) {
+                    if (!doNotFail) {
+                        exceptionMessage = "Index verify failed - Missing data 
row - " + indexHTable.getName();
                         throw new IOException(exceptionMessage);
                     }
                 }
-                verifySingleIndexRow(result, dataPut);
+                if (verifySingleIndexRow(result, dataPut)) {
+                    perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
+                }
                 rowCount++;
             }
         } catch (Throwable t) {
             ServerUtil.throwIOException(indexHTable.getName().toString(), t);
         }
         if (rowCount != expectedRowCount) {
-            String errorMsg = "Missing index rows - Expected: " + 
expectedRowCount +
-                    " Actual: " + rowCount;
+            for (Map.Entry<byte[], Put> entry : 
perTaskDataKeyToDataPutMap.entrySet()) {
+                String errorMsg = "Missing index row";
+                logToIndexToolOutputTable(entry.getKey(), null, 
getMaxTimestamp(entry.getValue()),
+                        0, errorMsg);
+                if (!doNotFail) {
                     exceptionMessage = "Index verify failed - " + errorMsg + " 
- " + indexHTable.getName();
-            logToIndexToolOutputTable(null, null, 0, 0, errorMsg);
-            if (!onlyVerify) {
-                throw new IOException(exceptionMessage);
+                    throw new IOException(exceptionMessage);
+                }
+            }
+        }
+    }
+
+    private void 
rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) 
throws IOException {
+        byte[] uuidValue = ServerCacheClient.generateId();
+        UngroupedAggregateRegionObserver.MutationList currentMutationList =
+                new 
UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+        for (Mutation mutation : mutationList) {
+            Put put = (Put) mutation;
+            currentMutationList.add(mutation);
+            setMutationAttributes(put, uuidValue);
+            uuidValue = commitIfReady(uuidValue, currentMutationList);
+            Delete deleteMarkers = generateDeleteMarkers(put);
+            if (deleteMarkers != null) {
+                setMutationAttributes(deleteMarkers, uuidValue);
+                currentMutationList.add(deleteMarkers);
+                uuidValue = commitIfReady(uuidValue, currentMutationList);
             }
         }
+        if (!currentMutationList.isEmpty()) {
+            
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, 
currentMutationList, blockingMemstoreSize);
+        }
     }
 
-    private void addVerifyTask(final ArrayList<KeyRange> keys) {
+    private void addVerifyTask(final ArrayList<KeyRange> keys, final 
Map<byte[], Put> perTaskDataKeyToDataPutMap) {
         tasks.add(new Task<Boolean>() {
             @Override
             public Boolean call() throws Exception {
@@ -495,7 +529,13 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                         exceptionMessage = "Pool closed, not attempting to 
verify index rows! " + indexHTable.getName();
                         throw new IOException(exceptionMessage);
                     }
-                    verifyIndexRows(keys);
+                    verifyIndexRows(keys, perTaskDataKeyToDataPutMap);
+                    if (verifyType == IndexTool.IndexVerifyType.BEFORE || 
verifyType == IndexTool.IndexVerifyType.BOTH) {
+                        synchronized (dataKeyToDataPutMap) {
+                            
dataKeyToDataPutMap.putAll(perTaskDataKeyToDataPutMap);
+                        }
+                    }
+                    perTaskDataKeyToDataPutMap.clear();
                 } catch (Exception e) {
                     throw e;
                 }
@@ -504,11 +544,81 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
         });
     }
 
+    private void parallelizeIndexVerify() throws IOException {
+        addToBeVerifiedIndexRows();
+        ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+        Map<byte[], Put> perTaskDataKeyToDataPutMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
+            perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), 
entry.getValue());
+            if (keys.size() == rowCountPerTask) {
+                addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+                keys = new ArrayList<>(rowCountPerTask);
+                perTaskDataKeyToDataPutMap = 
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+            }
+        }
+        if (keys.size() > 0) {
+            addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+        }
+        List<Boolean> taskResultList = null;
+        try {
+            LOGGER.debug("Waiting on index verify tasks to complete...");
+            taskResultList = this.pool.submitUninterruptible(tasks);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Should not fail on the results while 
using a WaitForCompletionTaskRunner", e);
+        } catch (EarlyExitFailure e) {
+            throw new RuntimeException("Stopped while waiting for batch, 
quitting!", e);
+        } finally {
+            tasks.getTasks().clear();
+        }
+        for (Boolean result : taskResultList) {
+            if (result == null) {
+                // there was a failure
+                throw new IOException(exceptionMessage);
+            }
+        }
+    }
+
+    private void verifyAndOrRebuildIndex() throws IOException {
+        if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == 
IndexTool.IndexVerifyType.NONE) {
+            // For these options we start with rebuilding index rows
+            rebuildIndexRows(mutations);
+        }
+        if (verifyType == IndexTool.IndexVerifyType.NONE) {
+            return;
+        }
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == 
IndexTool.IndexVerifyType.BOTH ||
+                verifyType == IndexTool.IndexVerifyType.ONLY) {
+            // For these options we start with verifying index rows
+            doNotFail = true; // Don't stop at the first mismatch
+            parallelizeIndexVerify();
+        }
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
+            // For these options, we have identified the rows to be rebuilt 
and now need to rebuild them
+            // At this point, dataKeyToDataPutMap includes mapping only for 
the rows to be rebuilt
+            mutations.clear();
+            for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) 
{
+                mutations.add(entry.getValue());
+            }
+            rebuildIndexRows(mutations);
+        }
+
+        if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == 
IndexTool.IndexVerifyType.BOTH) {
+            // We have rebuilt index row and now we need to verify them
+            doNotFail = false; // Stop at the first mismatch
+            indexKeyToDataPutMap.clear();
+            parallelizeIndexVerify();
+        }
+        indexKeyToDataPutMap.clear();
+    }
+
     @Override
     public boolean next(List<Cell> results) throws IOException {
         int rowCount = 0;
         region.startRegionOperation();
         try {
+            // Partial rebuilds by MetadataRegionObserver use raw scan. Inline 
verification is not supported for them
+            boolean partialRebuild = scan.isRaw();
             byte[] uuidValue = ServerCacheClient.generateId();
             synchronized (innerScanner) {
                 do {
@@ -521,55 +631,58 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
                             if (KeyValue.Type.codeToType(cell.getTypeByte()) 
== KeyValue.Type.Put) {
                                 if (put == null) {
                                     put = new Put(CellUtil.cloneRow(cell));
-                                    setMutationAttributes(put, uuidValue);
                                     mutations.add(put);
                                 }
                                 put.add(cell);
                             } else {
                                 if (del == null) {
                                     del = new Delete(CellUtil.cloneRow(cell));
-                                    setMutationAttributes(del, uuidValue);
                                     mutations.add(del);
                                 }
                                 del.addDeleteMarker(cell);
                             }
                         }
-                        if (onlyVerify) {
-                            rowCount++;
-                            continue;
+                        if (partialRebuild) {
+                            if (put != null) {
+                                setMutationAttributes(put, uuidValue);
+                            }
+                            if (del != null) {
+                                setMutationAttributes(del, uuidValue);
+                            }
+                            uuidValue = commitIfReady(uuidValue, mutations);
                         }
-                        uuidValue = commitIfReady(uuidValue);
-                        if (!scan.isRaw()) {
-                            Delete deleteMarkers = generateDeleteMarkers(row);
+                        if (indexRowKey != null) {
+                            if (put != null) {
+                                setMutationAttributes(put, uuidValue);
+                            }
+                            Delete deleteMarkers = generateDeleteMarkers(put);
                             if (deleteMarkers != null) {
                                 setMutationAttributes(deleteMarkers, 
uuidValue);
                                 mutations.add(deleteMarkers);
-                                uuidValue = commitIfReady(uuidValue);
+                                uuidValue = commitIfReady(uuidValue, 
mutations);
                             }
-                        }
-                        if (indexRowKey != null) {
                             // GlobalIndexChecker passed the index row key. 
This is to build a single index row.
                             // Check if the data table row we have just 
scanned matches with the index row key.
                             // If not, there is no need to build the index row 
from this data table row,
                             // and just return zero row count.
                             if (checkIndexRow(indexRowKey, put)) {
                                 rowCount = 
GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
-                            }
-                            else {
+                            } else {
                                 rowCount = 
GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
                             }
                             break;
                         }
                         rowCount++;
                     }
-
                 } while (hasMore && rowCount < pageSizeInRows);
-                if (!mutations.isEmpty() && !onlyVerify) {
+            }
+            if (!partialRebuild && indexRowKey == null) {
+                verifyAndOrRebuildIndex();
+            }
+            else {
+                if (!mutations.isEmpty()) {
                     
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
                     
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, 
blockingMemstoreSize);
-                    if (verify) {
-                        addToBeVerifiedIndexRows();
-                    }
                 }
             }
         } catch (IOException e) {
@@ -578,42 +691,13 @@ public class IndexRebuildRegionScanner extends 
BaseRegionScanner {
             throw e;
         } finally {
             region.closeRegionOperation();
-        }
-        if (verify) {
-            if (onlyVerify) {
-                addToBeVerifiedIndexRows();
-            }
-            ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
-            for (byte[] key : indexKeyToDataPutMap.keySet()) {
-                keys.add(PVarbinary.INSTANCE.getKeyRange(key));
-                if (keys.size() == rowCountPerTask) {
-                    addVerifyTask(keys);
-                    keys = new ArrayList<>(rowCountPerTask);
-                }
-            }
-            if (keys.size() > 0) {
-                addVerifyTask(keys);
-            }
-            List<Boolean> taskResultList = null;
-            try {
-                LOGGER.debug("Waiting on index verify tasks to complete...");
-                taskResultList = this.pool.submitUninterruptible(tasks);
-            } catch (ExecutionException e) {
-                throw new RuntimeException("Should not fail on the results 
while using a WaitForCompletionTaskRunner", e);
-            } catch (EarlyExitFailure e) {
-                throw new RuntimeException("Stopped while waiting for batch, 
quitting!", e);
-            }
-            finally {
-                indexKeyToDataPutMap.clear();
-                tasks.getTasks().clear();
-            }
-            for (Boolean result : taskResultList) {
-                if (result == null) {
-                    // there was a failure
-                    throw new IOException(exceptionMessage);
-                }
+            mutations.clear();
+            if (verify) {
+              indexKeyToDataPutMap.clear();
+              dataKeyToDataPutMap.clear();
             }
         }
+
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
         final Cell aggKeyValue = 
PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
                 SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, 
rowCountBytes.length);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 0155602..a6d2e5a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -205,11 +205,12 @@ public class IndexTool extends Configured implements Tool 
{
             "This parameter is deprecated. Direct mode will be used whether it 
is set or not. Keeping it for backwards compatibility.");
 
     private static final Option VERIFY_OPTION = new Option("v", "verify", true,
-            "To verify every data row has a corresponding row. The accepted 
values are NONE, ONLY, BEFORE," +
-                    " AFTER, and BOTH. NONE is for no inline verification, 
which is also the default for this option. " +
-                    "ONLY is for verifying without rebuilding index rows. The 
rest for verifying before, after, and " +
-                    "both before and after rebuilding row. If the verification 
is done before rebuilding rows and " +
-                    "the correct index rows are not rebuilt. Currently 
supported values are NONE, ONLY and AFTER ");
+            "To verify every data row has a corresponding row of a global 
index. For other types of indexes, " +
+                    "this option will be silently ignored. The accepted values 
are NONE, ONLY, BEFORE,  AFTER, and BOTH. " +
+                    "NONE is for no inline verification, which is also the 
default for this option. ONLY is for " +
+                    "verifying without rebuilding index rows. The rest for 
verifying before, after, and both before " +
+                    "and after rebuilding row. If the verification is done 
before rebuilding rows and the correct " +
+                    "index rows will not be rebuilt");
 
     private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
 
@@ -683,10 +684,6 @@ public class IndexTool extends Configured implements Tool {
             if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
                 String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
                 indexVerifyType = IndexVerifyType.fromValue(value);
-                if (!(indexVerifyType == IndexVerifyType.NONE || 
indexVerifyType == IndexVerifyType.AFTER ||
-                        indexVerifyType == IndexVerifyType.ONLY)) {
-                    throw new IllegalStateException("Unsupported value for the 
verify option");
-                }
             }
             qDataTable = SchemaUtil.getQualifiedTableName(schemaName, 
dataTable);
             try(Connection tempConn = 
ConnectionUtil.getInputConnection(configuration)) {

Reply via email to