This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push: new 689ac95 PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter 689ac95 is described below commit 689ac9577f564fb02f2d6e886ff6f5e226d2f81e Author: Kadir <kozde...@salesforce.com> AuthorDate: Thu Nov 7 15:50:40 2019 -0800 PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter --- .../phoenix/hbase/index/IndexRegionObserver.java | 155 +++++++++++---------- .../hbase/index/builder/IndexBuildManager.java | 15 +- 2 files changed, 84 insertions(+), 86 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index b058b33..340832f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -32,6 +32,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -67,6 +69,7 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.write.IndexWriter; @@ -145,16 +148,16 @@ public class IndexRegionObserver extends BaseRegionObserver { private final int clientVersion; // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e., // the verified column) will have the value false ("unverified") on these mutations - private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList(); + private ListMultimap<HTableInterfaceReference, Mutation> preIndexUpdates; // The collection of index mutations that will be applied after the data table mutations. The empty column (i.e., // the verified column) will have the value true ("verified") on the put mutations - private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList(); + private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates; // The collection of candidate index mutations that will be applied after the data table mutations - private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates; + private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates; private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>(); - long dataWriteStartTime; - + private long dataWriteStartTime; + private boolean rebuild; private BatchMutateContext(int clientVersion) { this.clientVersion = clientVersion; } @@ -506,6 +509,27 @@ public class IndexRegionObserver extends BaseRegionObserver { } } + private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp, + ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) { + byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(tableName)); + List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference); + if (localIndexUpdates == null || localIndexUpdates.isEmpty()) { + return; + } + List<Mutation> localUpdates = new ArrayList<Mutation>(); + Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator(); + while (indexUpdatesItr.hasNext()) { + Pair<Mutation, byte[]> next = indexUpdatesItr.next(); + localUpdates.add(next.getFirst()); + } + if (!localUpdates.isEmpty()) { + miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()])); + } + } + private void prepareIndexMutations( ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, @@ -513,79 +537,56 @@ public class IndexRegionObserver extends BaseRegionObserver { Collection<? extends Mutation> mutations, long now, PhoenixIndexMetaData indexMetaData) throws Throwable { - List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); - // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { Span current = scope.getSpan(); if (current == null) { current = NullSpan.INSTANCE; } - // get the index updates for all elements in this batch - Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates = - this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData); - + context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create(); + this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData); current.addTimelineAnnotation("Built index updates, doing preStep"); - TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); - byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); - Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator(); - List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size()); - context.preIndexUpdates = new ArrayList<>(indexUpdates.size()); - context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size()); - while(indexUpdatesItr.hasNext()) { - Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next(); - if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) { - localUpdates.add(next.getFirst().getFirst()); - indexUpdatesItr.remove(); - } - else { - // get index maintainer for this index table - IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond()); - if (indexMaintainer == null) { - throw new DoNotRetryIOException( - "preBatchMutateWithExceptions: indexMaintainer is null " + - c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString()); - } - byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); - byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); + handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates); + context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); + int updateCount = 0; + for (IndexMaintainer indexMaintainer : maintainers) { + updateCount++; + byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = + context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator(); + while (indexUpdatesItr.hasNext()) { + Pair<Mutation, byte[]> next = indexUpdatesItr.next(); // add the VERIFIED cell, which is the empty cell - Mutation m = next.getFirst().getFirst(); - boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap()); - if (rebuild) { + Mutation m = next.getFirst(); + if (context.rebuild) { + indexUpdatesItr.remove(); if (m instanceof Put) { long ts = getMaxTimestamp(m); // Remove the empty column prepared by Index codec as we need to change its value removeEmptyColumn(m, emptyCF, emptyCQ); - ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES); + ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES); } + context.preIndexUpdates.put(hTableInterfaceReference, m); } else { - indexUpdatesItr.remove(); // For this mutation whether it is put or delete, set the status of the index row "unverified" // This will be done before the data table row is updated (i.e., in the first write phase) Put unverifiedPut = new Put(m.getRow()); unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES); - context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond())); + context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut); if (m instanceof Put) { // Remove the empty column prepared by Index codec as we need to change its value removeEmptyColumn(m, emptyCF, emptyCQ); ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); } - context.intermediatePostIndexUpdates.add(next); } } } - if (!localUpdates.isEmpty()) { - miniBatchOp.addOperationsFromCP(0, - localUpdates.toArray(new Mutation[localUpdates.size()])); - } - if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) { - context.preIndexUpdates = new ArrayList<>(indexUpdates.size()); - } - for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) { - context.preIndexUpdates.add(update.getFirst()); - } + TracingUtils.addAnnotation(current, "index update count", updateCount); } } @@ -610,20 +611,23 @@ public class IndexRegionObserver extends BaseRegionObserver { setBatchMutateContext(c, context); Mutation firstMutation = miniBatchOp.getOperation(0); ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation); + context.rebuild = replayWrite != null; /* * Exclusively lock all rows so we get a consistent read * while determining the index updates */ - if (replayWrite == null) { + long now; + if (!context.rebuild) { populateRowsToLock(miniBatchOp, context); lockRows(context); - } - long now = EnvironmentEdgeManager.currentTimeMillis(); - // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect - // concurrent updates - if (replayWrite == null) { + now = EnvironmentEdgeManager.currentTimeMillis(); + // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect + // concurrent updates populatePendingRows(context); } + else { + now = EnvironmentEdgeManager.currentTimeMillis(); + } // First group all the updates for a single row into a single update to be processed Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite); // early exit if it turns out we don't have any edits @@ -646,9 +650,11 @@ public class IndexRegionObserver extends BaseRegionObserver { for (RowLock rowLock : context.rowLocks) { rowLock.release(); } - // Do the index updates + // Do the first phase index updates doPre(c, context, miniBatchOp); - if (replayWrite == null) { + context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); + if (!context.rebuild) { + List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); // Acquire the locks again before letting the region proceed with data table updates List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size()); for (RowLock rowLock : context.rowLocks) { @@ -658,29 +664,26 @@ public class IndexRegionObserver extends BaseRegionObserver { context.rowLocks.clear(); context.rowLocks = rowLocks; // Check if we need to skip post index update for any of the row - Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator(); - while (iterator.hasNext()) { - // Check if this row is going through another mutation which has a newer timestamp. If so, - // ignore the pending updates for this row - Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next(); - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond()); - PendingRow pendingRow = pendingRows.get(rowKey); - // Are there concurrent updates on the data table row? if so, skip post index updates - // and let read repair resolve conflicts - if (pendingRow.isConcurrent()) { - iterator.remove(); + for (IndexMaintainer indexMaintainer : maintainers) { + HTableInterfaceReference hTableInterfaceReference = + new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + Iterator<Pair<Mutation, byte[]>> iterator = + context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator(); + while (iterator.hasNext()) { + // Are there concurrent updates on the data table row? if so, skip post index updates + // and let read repair resolve conflicts + Pair<Mutation, byte[]> update = iterator.next(); + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond()); + PendingRow pendingRow = pendingRows.get(rowKey); + if (!pendingRow.isConcurrent()) { + context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst()); + } } } // We are done with handling concurrent mutations. So we can remove the rows of this batch from // the collection of pending rows removePendingRows(context); } - if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) { - context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size()); - } - for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) { - context.postIndexUpdates.add(update.getFirst()); - } if (failDataTableUpdatesForTesting) { throw new DoNotRetryIOException("Simulating the data table write failure"); } @@ -758,7 +761,7 @@ public class IndexRegionObserver extends BaseRegionObserver { private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post) throws IOException { - Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates; + ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates; //short circuit, if we don't need to do any work if (context == null || indexUpdates.isEmpty()) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index 7639a49..90d28b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import com.google.common.collect.ListMultimap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; @@ -34,6 +35,8 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.PhoenixIndexMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +82,7 @@ public class IndexBuildManager implements Stoppable { return this.delegate.getIndexMetaData(miniBatchOp); } - public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates( + public void getIndexUpdates(ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates, MiniBatchOperationInProgress<Mutation> miniBatchOp, Collection<? extends Mutation> mutations, IndexMetaData indexMetaData) throws Throwable { @@ -87,20 +90,12 @@ public class IndexBuildManager implements Stoppable { this.delegate.batchStarted(miniBatchOp, indexMetaData); // Avoid the Object overhead of the executor when it's not actually parallelizing anything. - ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size()); for (Mutation m : mutations) { Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData); - if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) { - for (Pair<Mutation, byte[]> update : updates) { - update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); - } - } for (Pair<Mutation, byte[]> update : updates) { - results.add(new Pair<>(update, m.getRow())); + indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow())); } } - return results; } public Collection<Pair<Mutation, byte[]>> getIndexUpdate(