PHOENIX-4004 Remove unnecessary allocations in server-side mutable secondary-index path
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce71efc9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce71efc9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce71efc9 Branch: refs/heads/4.x-HBase-1.1 Commit: ce71efc9fe19f03524b24e69ecaa19b0f03846de Parents: c2a7389 Author: Josh Elser <[email protected]> Authored: Fri Jul 7 16:40:27 2017 -0400 Committer: Josh Elser <[email protected]> Committed: Fri Jul 21 17:35:11 2017 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/hbase/index/Indexer.java | 4 +- .../hbase/index/builder/IndexBuildManager.java | 78 ++------------------ .../hbase/index/covered/LocalTableState.java | 24 +++++- .../example/CoveredColumnIndexCodec.java | 21 ++++-- 4 files changed, 47 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 5a78c94..38401d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -610,7 +610,9 @@ public class Indexer extends BaseRegionObserver { * @return the mutations to apply to the index tables */ private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) { - Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(); + // Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit + int initialSize = Math.min(edit.size(), 64); + Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize); for (Cell kv : edit.getCells()) { if (kv instanceof IndexedKeyValue) { IndexedKeyValue ikv = (IndexedKeyValue) kv; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index 325904d..c015a77 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -45,35 +45,14 @@ import com.google.common.util.concurrent.MoreExecutors; /** * Manage the building of index updates from primary table updates. - * <p> - * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying - * {@link IndexBuilder} <b>must be thread safe</b> for each index update. */ public class IndexBuildManager implements Stoppable { private static final Log LOG = LogFactory.getLog(IndexBuildManager.class); private final IndexBuilder delegate; - private QuickFailingTaskRunner pool; private boolean stopped; /** - * Set the number of threads with which we can concurrently build index updates. Unused threads - * will be released, but setting the number of threads too high could cause frequent swapping and - * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot - * of time building index updates, it could be worthwhile to spend the time to tune this parameter - * as it could lead to dramatic increases in speed. - */ - public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max"; - /** Default to a single thread. This is the safest course of action, but the slowest as well */ - private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10; - /** - * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the - * threads and will re-create them as needed, up to the configured max - */ - private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY = - "index.builder.threads.keepalivetime"; - - /** * @param env environment in which <tt>this</tt> is running. Used to setup the * {@link IndexBuilder} and executor * @throws IOException if an {@link IndexBuilder} cannot be correctly steup @@ -81,7 +60,7 @@ public class IndexBuildManager implements Stoppable { public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException { // Prevent deadlock by using single thread for all reads so that we know // we can get the ReentrantRWLock. See PHOENIX-2671 for more details. - this(getIndexBuilder(env), new QuickFailingTaskRunner(MoreExecutors.sameThreadExecutor())); + this.delegate = getIndexBuilder(env); } private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException { @@ -101,20 +80,6 @@ public class IndexBuildManager implements Stoppable { } } - private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) { - String serverName = env.getRegionServerServices().getServerName().getServerName(); - return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()). - setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY). - setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY, - DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS); - } - - public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) { - this.delegate = builder; - this.pool = pool; - } - - public Collection<Pair<Mutation, byte[]>> getIndexUpdate( MiniBatchOperationInProgress<Mutation> miniBatchOp, Collection<? extends Mutation> mutations) throws Throwable { @@ -122,41 +87,11 @@ public class IndexBuildManager implements Stoppable { final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp); this.delegate.batchStarted(miniBatchOp, indexMetaData); - // parallelize each mutation into its own task - // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would - // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be - // acknowledged on each thread before doing the actual lookup, but after that depends on the - // underlying builder to look for the closed flag. - TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks = - new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size()); - for (final Mutation m : mutations) { - tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() { - - @Override - public Collection<Pair<Mutation, byte[]>> call() throws IOException { - return delegate.getIndexUpdate(m, indexMetaData); - } - - }); - } - List<Collection<Pair<Mutation, byte[]>>> allResults = null; - try { - allResults = pool.submitUninterruptible(tasks); - } catch (CancellationException e) { - throw e; - } catch (ExecutionException e) { - LOG.error("Found a failed index update!"); - throw e.getCause(); + // Avoid the Object overhead of the executor when it's not actually parallelizing anything. + ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size()); + for (Mutation m : mutations) { + results.addAll(delegate.getIndexUpdate(m, indexMetaData)); } - - // we can only get here if we get successes from each of the tasks, so each of these must have a - // correct result - Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>(); - for (Collection<Pair<Mutation, byte[]>> result : allResults) { - assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier"; - results.addAll(result); - } - return results; } @@ -194,7 +129,6 @@ public class IndexBuildManager implements Stoppable { } this.stopped = true; this.delegate.stop(why); - this.pool.stop(why); } @Override @@ -206,4 +140,4 @@ public class IndexBuildManager implements Stoppable { return this.delegate; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index 245bd66..acbf1ab 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -18,7 +18,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; @@ -84,6 +86,24 @@ public class LocalTableState implements TableState { } } + private void addUpdateCells(List<Cell> list, boolean overwrite) { + if (list == null) return; + // Avoid a copy of the Cell into a KeyValue if it's already a KeyValue + for (Cell c : list) { + this.memstore.add(maybeCopyCell(c), overwrite); + } + } + + private KeyValue maybeCopyCell(Cell c) { + // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something + // that will likely be removed at some point in time. + if (c == null) return null; + if (c instanceof KeyValue) { + return (KeyValue) c; + } + return KeyValueUtil.copyToNewKeyValue(c); + } + @Override public RegionCoprocessorEnvironment getEnvironment() { return this.env; @@ -176,8 +196,8 @@ public class LocalTableState implements TableState { // no need to perform scan to find prior row values when the indexed columns are immutable, as // by definition, there won't be any. if (!indexMetaData.isImmutableRows()) { - // add the current state of the row - this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false); + // add the current state of the row. Uses listCells() to avoid a new array creation. + this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false); } // add the covered columns to the set http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 5963f2e..1392906 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -61,7 +61,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { @Override public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) { - List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); + List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size()); for (ColumnGroup group : groups) { IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData); updates.add(update); @@ -115,7 +115,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { @Override public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { - List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); + List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size()); for (ColumnGroup group : groups) { deletes.add(getDeleteForGroup(group, state, context)); } @@ -238,9 +238,12 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { * to use when building the key */ static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) { + final int numColumnEntries = values.size() * Bytes.SIZEOF_INT; // now build up expected row key, each of the values, in order, followed by the PK and then some // info about lengths so we can deserialize each value - byte[] output = new byte[length + pk.length]; + // + // output = length of values + primary key + column entries + length of each column entry + number of column entries + byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT]; int pos = 0; int[] lengths = new int[values.size()]; int i = 0; @@ -256,14 +259,22 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { // add the primary key to the end of the row key System.arraycopy(pk, 0, output, pos, pk.length); + pos += pk.length; // add the lengths as suffixes so we can deserialize the elements again for (int l : lengths) { - output = ArrayUtils.addAll(output, Bytes.toBytes(l)); + byte[] serializedLength = Bytes.toBytes(l); + System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT); + pos += Bytes.SIZEOF_INT; } // and the last integer is the number of values - return ArrayUtils.addAll(output, Bytes.toBytes(values.size())); + byte[] serializedNumValues = Bytes.toBytes(values.size()); + System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT); + // Just in case we serialize more in the rowkey in the future.. + pos += Bytes.SIZEOF_INT; + + return output; } /**
