Repository: cassandra Updated Branches: refs/heads/trunk b1f73d4b0 -> 440366edd
Make index building pluggable Patch and review by Pavel Yaskevich and Sam Tunnicliffe for CASSANDRA-10681 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/440366ed Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/440366ed Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/440366ed Branch: refs/heads/trunk Commit: 440366edd0ef0e1c6c1af69230dabc996d967626 Parents: b1f73d4 Author: Pavel Yaskevich <[email protected]> Authored: Thu Nov 19 15:56:42 2015 -0800 Committer: Sam Tunnicliffe <[email protected]> Committed: Mon Nov 23 18:47:55 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/index/Index.java | 51 +++++++++++++ .../cassandra/index/SecondaryIndexBuilder.java | 53 +------------ .../cassandra/index/SecondaryIndexManager.java | 20 +++-- .../index/internal/CassandraIndex.java | 6 +- .../internal/CollatedViewIndexBuilder.java | 78 ++++++++++++++++++++ .../index/internal/CustomCassandraIndex.java | 6 +- 7 files changed, 152 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a6646f9..13b2d05 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.2 + * Make index building pluggable (CASSANDRA-10681) * Add sstable flush observer (CASSANDRA-10678) * Improve NTS endpoints calculation (CASSANDRA-10200) * Improve performance of the folderSize function (CASSANDRA-10677) http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index 7bca924..64d621f 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -1,6 +1,8 @@ package org.apache.cassandra.index; +import java.util.Collection; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Callable; import java.util.function.BiFunction; @@ -15,9 +17,12 @@ import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.internal.CollatedViewIndexBuilder; import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -108,10 +113,56 @@ public interface Index { /* + * Helpers for building indexes from SSTable data + */ + + /** + * Provider of {@code SecondaryIndexBuilder} instances. See {@code getBuildTaskSupport} and + * {@code SecondaryIndexManager.buildIndexesBlocking} for more detail. + */ + interface IndexBuildingSupport + { + SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables); + } + + /** + * Default implementation of {@code IndexBuildingSupport} which uses a {@code ReducingKeyIterator} to obtain a + * collated view of the data in the SSTables. + */ + public static class CollatedViewIndexBuildingSupport implements IndexBuildingSupport + { + public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables) + { + return new CollatedViewIndexBuilder(cfs, indexes, new ReducingKeyIterator(sstables)); + } + } + + /** + * Singleton instance of {@code CollatedViewIndexBuildingSupport}, which may be used by any {@code Index} + * implementation. + */ + public static final CollatedViewIndexBuildingSupport INDEX_BUILDER_SUPPORT = new CollatedViewIndexBuildingSupport(); + + /* * Management functions */ /** + * Get an instance of a helper to provide tasks for building the index from a set of SSTable data. + * When processing a number of indexes to be rebuilt, {@code SecondaryIndexManager.buildIndexesBlocking} groups + * those with the same {@code IndexBuildingSupport} instance, allowing multiple indexes to be built with a + * single pass through the data. The singleton instance returned from the default method implementation builds + * indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data. + * + * @return an instance of the index build taski helper. Index implementations which return <b>the same instance</b> + * will be built using a single task. + */ + default IndexBuildingSupport getBuildTaskSupport() + { + return INDEX_BUILDER_SUPPORT; + } + + /** * Return a task to perform any initialization work when a new index instance is created. * This may involve costly operations such as (re)building the index, and is performed asynchronously * by SecondaryIndexManager http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java index e66f0a3..9ec8a4e 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java @@ -17,61 +17,12 @@ */ package org.apache.cassandra.index; -import java.io.IOException; -import java.util.Set; -import java.util.UUID; - -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionInfo; -import org.apache.cassandra.db.compaction.CompactionInterruptedException; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.io.sstable.ReducingKeyIterator; -import org.apache.cassandra.utils.UUIDGen; /** * Manages building an entire index from column family data. Runs on to compaction manager. */ -public class SecondaryIndexBuilder extends CompactionInfo.Holder +public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder { - private final ColumnFamilyStore cfs; - private final Set<Index> indexers; - private final ReducingKeyIterator iter; - private final UUID compactionId; - - public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter) - { - this.cfs = cfs; - this.indexers = indexers; - this.iter = iter; - this.compactionId = UUIDGen.getTimeUUID(); - } - - public CompactionInfo getCompactionInfo() - { - return new CompactionInfo(cfs.metadata, - OperationType.INDEX_BUILD, - iter.getBytesRead(), - iter.getTotalBytes(), - compactionId); - } - - public void build() - { - try - { - while (iter.hasNext()) - { - if (isStopRequested()) - throw new CompactionInterruptedException(getCompactionInfo()); - DecoratedKey key = iter.next(); - Keyspace.indexPartition(key, cfs, indexers); - } - } - finally - { - iter.close(); - } - } + public abstract void build(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index df8e38d..3eb72d3 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -51,7 +51,6 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.internal.CassandraIndex; import org.apache.cassandra.index.transactions.*; -import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; @@ -338,11 +337,20 @@ public class SecondaryIndexManager implements IndexRegistry indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")), sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); - SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, - indexes, - new ReducingKeyIterator(sstables)); - Future<?> future = CompactionManager.instance.submitIndexBuild(builder); - FBUtilities.waitOnFuture(future); + Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>(); + for (Index index : indexes) + { + Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>()); + stored.add(index); + } + + List<Future<?>> futures = byType.entrySet() + .stream() + .map((e) -> e.getKey().getIndexBuildTask(baseCfs, e.getValue(), sstables)) + .map(CompactionManager.instance::submitIndexBuild) + .collect(Collectors.toList()); + + FBUtilities.waitOnFutures(futures); flushIndexesBlocking(indexes); logger.info("Index build of {} complete", http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 674cd20..3128152 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -664,9 +664,9 @@ public abstract class CassandraIndex implements Index metadata.name, getSSTableNames(sstables)); - SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, - Collections.singleton(this), - new ReducingKeyIterator(sstables)); + SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables)); Future<?> future = CompactionManager.instance.submitIndexBuild(builder); FBUtilities.waitOnFuture(future); indexCfs.forceBlockingFlush(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java new file mode 100644 index 0000000..8ea7a68 --- /dev/null +++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.internal; + +import java.util.Set; +import java.util.UUID; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.utils.UUIDGen; + +/** + * Manages building an entire index from column family data. Runs on to compaction manager. + */ +public class CollatedViewIndexBuilder extends SecondaryIndexBuilder +{ + private final ColumnFamilyStore cfs; + private final Set<Index> indexers; + private final ReducingKeyIterator iter; + private final UUID compactionId; + + public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter) + { + this.cfs = cfs; + this.indexers = indexers; + this.iter = iter; + this.compactionId = UUIDGen.getTimeUUID(); + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata, + OperationType.INDEX_BUILD, + iter.getBytesRead(), + iter.getTotalBytes(), + compactionId); + } + + public void build() + { + try + { + while (iter.hasNext()) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + DecoratedKey key = iter.next(); + Keyspace.indexPartition(key, cfs, indexers); + } + } + finally + { + iter.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java index 0957f74..2b17849 100644 --- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java +++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java @@ -622,9 +622,9 @@ public class CustomCassandraIndex implements Index metadata.name, getSSTableNames(sstables)); - SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, - Collections.singleton(this), - new ReducingKeyIterator(sstables)); + SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs, + Collections.singleton(this), + new ReducingKeyIterator(sstables)); Future<?> future = CompactionManager.instance.submitIndexBuild(builder); FBUtilities.waitOnFuture(future); indexCfs.forceBlockingFlush();
