This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 4b2ff49 PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes 4b2ff49 is described below commit 4b2ff49fde9b52741e6ec4b7cb1e90724f4ed63a Author: Kadir Ozdemir <kozde...@salesforce.com> AuthorDate: Sun Sep 27 15:59:20 2020 -0700 PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes --- .../phoenix/hbase/index/IndexRegionObserver.java | 329 +++++++++++---------- 1 file changed, 169 insertions(+), 160 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 0457481..4d804e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -76,7 +76,6 @@ import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException import org.apache.phoenix.hbase.index.builder.IndexBuildManager; import org.apache.phoenix.hbase.index.builder.IndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; @@ -98,6 +97,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew; import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild; @@ -125,24 +126,29 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { * Class to represent pending data table rows */ private static class PendingRow { - private boolean concurrent = false; - private long count = 1; + private int count; + private BatchMutateContext lastContext; - public void add() { + PendingRow(BatchMutateContext context) { + count = 1; + lastContext = context; + } + + public void add(BatchMutateContext context) { count++; - concurrent = true; + lastContext = context; } public void remove() { count--; } - public long getCount() { + public int getCount() { return count; } - public boolean isConcurrent() { - return concurrent; + public BatchMutateContext getLastContext() { + return lastContext; } } @@ -161,9 +167,25 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { failDataTableUpdatesForTesting = fail; } + public enum BatchMutatePhase { + PRE, POST, FAILED + } + // Hack to get around not being able to save any state between // coprocessor calls. TODO: remove after HBASE-18127 when available + + /** + * The concurrent batch of mutations is a set such that every pair of batches in this set has at least one common row. + * Since a BatchMutateContext object of a batch is modified only after the row locks for all the rows that are mutated + * by this batch are acquired, there can be only one thread can acquire the locks for its batch and safely access + * all the batch contexts in the set of concurrent batches. Because of this, we do not read atomic variables or + * additional locks to serialize the access to the BatchMutateContext objects. + */ + private static class BatchMutateContext { + private BatchMutatePhase currentPhase = BatchMutatePhase.PRE; + // The max of reference counts on the pending rows of this batch at the time this batch arrives + private int maxPendingRowCount = 0; private final int clientVersion; // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e., // the verified column) will have the value false ("unverified") on these mutations @@ -175,15 +197,42 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates; private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>(); - private boolean rebuild; // The current and next states of the data rows corresponding to the pending mutations private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates; - // Data table pending mutations + // The previous concurrent batch contexts + private HashMap<ImmutableBytesPtr, BatchMutateContext> lastConcurrentBatchContext = null; + // The latches of the threads waiting for this batch to complete + private List<CountDownLatch> waitList = null; private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap; private BatchMutateContext(int clientVersion) { this.clientVersion = clientVersion; } + + public BatchMutatePhase getCurrentPhase() { + return currentPhase; + } + + public Put getNextDataRowState(ImmutableBytesPtr rowKeyPtr) { + Pair<Put, Put> rowState = dataRowStates.get(rowKeyPtr); + if (rowState != null) { + return rowState.getSecond(); + } + return null; + } + + public CountDownLatch getCountDownLatch() { + if (waitList == null) { + waitList = new ArrayList<>(); + } + CountDownLatch countDownLatch = new CountDownLatch(1); + waitList.add(countDownLatch); + return countDownLatch; + } + + public int getMaxPendingRowCount() { + return maxPendingRowCount; + } } private ThreadLocal<BatchMutateContext> batchMutateContext = @@ -223,9 +272,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { private long slowIndexPrepareThreshold; private long slowPreIncrementThreshold; private int rowLockWaitDuration; + private int concurrentMutationWaitDuration; private String dataTableName; private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; @Override public Optional<RegionObserver> getRegionObserver() { @@ -259,8 +310,9 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); - this.lockManager = new LockManager(); - + this.lockManager = new LockManager(); + this.concurrentMutationWaitDuration = env.getConfiguration().getInt("phoenix.index.concurrent.wait.duration.ms", + DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS); // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource(); setSlowThresholds(e.getConfiguration()); @@ -409,15 +461,22 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { } } + private void unlockRows(BatchMutateContext context) throws IOException { + for (RowLock rowLock : context.rowLocks) { + rowLock.release(); + } + context.rowLocks.clear(); + } + private void populatePendingRows(BatchMutateContext context) { for (RowLock rowLock : context.rowLocks) { ImmutableBytesPtr rowKey = rowLock.getRowKey(); PendingRow pendingRow = pendingRows.get(rowKey); if (pendingRow == null) { - pendingRows.put(rowKey, new PendingRow()); + pendingRows.put(rowKey, new PendingRow(context)); } else { // m is a mutation on a row that has already a pending mutation in progress from another batch - pendingRow.add(); + pendingRow.add(context); } } } @@ -608,15 +667,34 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException { Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size()); + context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size()); for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) { - keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get())); + PendingRow pendingRow = pendingRows.get(rowKeyPtr); + if (pendingRow != null && pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) { + if (context.lastConcurrentBatchContext == null) { + context.lastConcurrentBatchContext = new HashMap<>(); + } + context.lastConcurrentBatchContext.put(rowKeyPtr, pendingRow.getLastContext()); + if (context.maxPendingRowCount < pendingRow.getCount()) { + context.maxPendingRowCount = pendingRow.getCount(); + } + Put put = pendingRow.getLastContext().getNextDataRowState(rowKeyPtr); + if (put != null) { + context.dataRowStates.put(rowKeyPtr, new Pair<Put, Put>(put, new Put(put))); + } + } + else { + keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get())); + } + } + if (keys.isEmpty()) { + return; } Scan scan = new Scan(); ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys)); scanRanges.initializeScan(scan); SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); scan.setFilter(skipScanFilter); - context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size()); try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) { boolean more = true; while(more) { @@ -765,43 +843,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { return (PhoenixIndexMetaData)indexMetaData; } - /** - * IndexMaintainer.getIndexedColumns() returns the data column references for indexed columns. The data columns are - * grouped into three classes, pk columns (data table pk columns), the indexed columns (the columns for which - * we want to have indexing; they form the prefix for the primary key for the index table (after salt and tenant id)) - * and covered columns. The purpose of this method is to find out if all the indexed columns are included in the - * pending data table mutation pointed by multiMutation. - */ - private boolean hasAllIndexedColumns(IndexMaintainer indexMaintainer, MultiMutation multiMutation) { - Map<byte[], List<Cell>> familyMap = multiMutation.getFamilyCellMap(); - for (ColumnReference columnReference : indexMaintainer.getIndexedColumns()) { - byte[] family = columnReference.getFamily(); - List<Cell> cellList = familyMap.get(family); - if (cellList == null) { - return false; - } - boolean has = false; - for (Cell cell : cellList) { - if (CellUtil.matchingColumn(cell, family, columnReference.getQualifier())) { - has = true; - break; - } - } - if (!has) { - return false; - } - } - return true; - } - - private void preparePostIndexMutations(TableName table, - BatchMutateContext context, + private void preparePostIndexMutations(BatchMutateContext context, long now, - PhoenixIndexMetaData indexMetaData) - throws Throwable { + PhoenixIndexMetaData indexMetaData) { context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); - // Check if we need to skip post index update for any of the rows for (IndexMaintainer indexMaintainer : maintainers) { byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); @@ -809,90 +855,21 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); List<Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference); for (Pair<Mutation, byte[]> update : updates) { - // Are there concurrent updates on the data table row? if so, skip post index updates - // and let read repair resolve conflicts - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond()); - PendingRow pendingRow = pendingRows.get(rowKey); - if (!pendingRow.isConcurrent()) { - Mutation m = update.getFirst(); - if (m instanceof Put) { - Put verifiedPut = new Put(m.getRow()); - // Set the status of the index row to "verified" - verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); - context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut); - } else { - context.postIndexUpdates.put(hTableInterfaceReference, m); - } + Mutation m = update.getFirst(); + if (m instanceof Put) { + Put verifiedPut = new Put(m.getRow()); + // Set the status of the index row to "verified" + verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); + context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut); } else { - if (!hasAllIndexedColumns(indexMaintainer, context.multiMutationMap.get(rowKey))) { - // This batch needs to be retried since one of the concurrent mutations does not have the value - // for an indexed column. Not including an index column may lead to incorrect index row key - // generation for concurrent mutations since concurrent mutations are not serialized entirely - // and do not see each other's effect on data table. Throwing an IOException will result in - // retries of this batch. Before throwing exception, we need to remove reference counts and - // locks for the rows of this batch - removePendingRows(context); - context.indexUpdates.clear(); - for (RowLock rowLock : context.rowLocks) { - rowLock.release(); - } - context.rowLocks.clear(); - throw new IOException("One of the concurrent mutations does not have all indexed columns. " + - "The batch needs to be retried " + table.getNameAsString()); - } + context.postIndexUpdates.put(hTableInterfaceReference, m); } } } - - // We are done with handling concurrent mutations. So we can remove the rows of this batch from - // the collection of pending rows removePendingRows(context); context.indexUpdates.clear(); } - /** - * There are at most two rebuild mutation for every row, one put and one delete. They are listed in indexMutations - * next to each other such that put comes before delete by {@link IndexRebuildRegionScanner}. This method is called - * only for global indexes. - */ - private void preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment> c, - MiniBatchOperationInProgress<Mutation> miniBatchOp, - BatchMutateContext context, - IndexMaintainer indexMaintainer) throws Throwable { - Put put = null; - List <Mutation> indexMutations = new ArrayList<>(); - for (int i = 0; i < miniBatchOp.size(); i++) { - if (miniBatchOp.getOperationStatus(i) == IGNORE) { - continue; - } - Mutation m = miniBatchOp.getOperation(i); - if (!this.builder.isEnabled(m)) { - continue; - } - if (m instanceof Put) { - if (put != null) { - indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null)); - } - put = (Put)m; - } else { - indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, (Delete)m)); - put = null; - } - miniBatchOp.setOperationStatus(i, NOWRITE); - } - if (put != null) { - indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null)); - } - HTableInterfaceReference hTableInterfaceReference = - new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); - context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); - for (Mutation m : indexMutations) { - context.preIndexUpdates.put(hTableInterfaceReference, m); - } - doPre(c, context, miniBatchOp); - // For rebuild updates, no post index update is prepared. Just create an empty list. - context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); - } private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) { for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) { @@ -912,46 +889,75 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { return false; } + private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context) + throws Throwable { + boolean done; + BatchMutatePhase phase; + done = true; + for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) { + phase = lastContext.getCurrentPhase(); + if (phase == BatchMutatePhase.FAILED) { + done = false; + break; + } + if (phase == BatchMutatePhase.PRE) { + CountDownLatch countDownLatch = lastContext.getCountDownLatch(); + // Release the locks so that the previous concurrent mutation can go into the post phase + unlockRows(context); + // Wait for at most one concurrentMutationWaitDuration for each level in the dependency tree of batches. + // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext + if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration, + TimeUnit.MILLISECONDS)) { + done = false; + break; + } + // Acquire the locks again before letting the region proceed with data table updates + lockRows(context); + } + } + if (!done) { + // This batch needs to be retried since one of the previous concurrent batches has not completed yet. + // Throwing an IOException will result in retries of this batch. Before throwing exception, + // we need to remove reference counts and locks for the rows of this batch + removePendingRows(context); + context.indexUpdates.clear(); + for (RowLock rowLock : context.rowLocks) { + rowLock.release(); + } + context.rowLocks.clear(); + throw new IOException("One of the previous concurrent mutations has not completed. " + + "The batch needs to be retried " + table.getNameAsString()); + } + } + public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { ignoreAtomicOperations(miniBatchOp); PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp); BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion()); setBatchMutateContext(c, context); - Mutation firstMutation = miniBatchOp.getOperation(0); - ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation); - context.rebuild = replayWrite != null; - if (context.rebuild) { - preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, indexMetaData.getIndexMaintainers().get(0)); - return; - } /* * Exclusively lock all rows so we get a consistent read * while determining the index updates */ populateRowsToLock(miniBatchOp, context); + // early exit if it turns out we don't have any update for indexes + if (context.rowsToLock.isEmpty()) { + return; + } lockRows(context); - long now = EnvironmentEdgeManager.currentTimeMillis(); - // Unless we're replaying edits to rebuild the index, we update the time stamp - // of the data table to prevent overlapping time stamps (which prevents index + // Update the timestamps of the data table mutations to prevent overlapping timestamps (which prevents index // inconsistencies as this case isn't handled correctly currently). setTimestamps(miniBatchOp, builder, now); - // Group all the updates for a single row into a single update to be processed (for local indexes, and global index retries) - Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context); - // early exit if it turns out we don't have any edits - if (mutations == null || mutations.isEmpty()) { - return; - } - TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); if (hasGlobalIndex(indexMetaData)) { + // Prepare current and next data rows states for pending mutations (for global indexes) + prepareDataRowStates(c, miniBatchOp, context, now); // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect // concurrent updates populatePendingRows(context); - // Prepare current and next data rows states for pending mutations (for global indexes) - prepareDataRowStates(c, miniBatchOp, context, now); // early exit if it turns out we don't have any edits long start = EnvironmentEdgeManager.currentTimeMillis(); preparePreIndexMutations(context, now, indexMetaData); @@ -965,17 +971,19 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { LOG.debug("slept 1ms for " + table.getNameAsString()); } // Release the locks before making RPC calls for index updates - for (RowLock rowLock : context.rowLocks) { - rowLock.release(); - } + unlockRows(context); // Do the first phase index updates doPre(c, context, miniBatchOp); // Acquire the locks again before letting the region proceed with data table updates - context.rowLocks.clear(); lockRows(context); - preparePostIndexMutations(table, context, now, indexMetaData); + if (context.lastConcurrentBatchContext != null) { + waitForPreviousConcurrentBatch(table, context); + } + preparePostIndexMutations(context, now, indexMetaData); } if (hasLocalIndex(indexMetaData)) { + // Group all the updates for a single row into a single update to be processed (for local indexes) + Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context); handleLocalIndexUpdates(table, miniBatchOp, mutations, indexMetaData); } if (failDataTableUpdatesForTesting) { @@ -1006,9 +1014,17 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { return; } try { - for (RowLock rowLock : context.rowLocks) { - rowLock.release(); + if (success) { + context.currentPhase = BatchMutatePhase.POST; + } else { + context.currentPhase = BatchMutatePhase.FAILED; } + if (context.waitList != null) { + for (CountDownLatch countDownLatch : context.waitList) { + countDownLatch.countDown(); + } + } + unlockRows(context); this.builder.batchCompleted(miniBatchOp); if (success) { // The pre-index and data table updates are successful, and now, do post index updates @@ -1077,9 +1093,6 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - if (ignoreIndexRebuildForTesting && context.rebuild) { - return; - } long start = EnvironmentEdgeManager.currentTimeMillis(); try { if (failPreIndexUpdatesForTesting) { @@ -1097,11 +1110,7 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor { // postBatchMutateIndispensably() is called removePendingRows(context); context.rowLocks.clear(); - if (context.rebuild) { - throw new IOException(String.format("%s for rebuild", e.getMessage()), e); - } else { - rethrowIndexingException(e); - } + rethrowIndexingException(e); } throw new RuntimeException( "Somehow didn't complete the index update, but didn't return succesfully either!");