Improve secondary index (re)build failure and concurrency handling patch by Andres de la Peña; reviewed by Paulo Motta and Sergio Bossa for CASSANDRA-10130
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/679c3171 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/679c3171 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/679c3171 Branch: refs/heads/trunk Commit: 679c31718b709f5619bba80eeb6f388484b94c3c Parents: a1c6a62 Author: AndreÌs de la PenÌa <a.penya.gar...@gmail.com> Authored: Fri Jun 23 09:30:21 2017 +0100 Committer: AndreÌs de la PenÌa <a.penya.gar...@gmail.com> Committed: Fri Jun 23 09:30:21 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 17 +- .../db/compaction/CompactionManager.java | 2 +- .../apache/cassandra/db/lifecycle/Tracker.java | 8 +- .../cassandra/index/SecondaryIndexManager.java | 597 +++++++++++---- .../index/internal/CassandraIndex.java | 2 - .../notifications/SSTableAddedNotification.java | 35 +- .../cassandra/streaming/StreamReceiveTask.java | 4 +- ...pactionStrategyManagerPendingRepairTest.java | 10 +- .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/db/lifecycle/TrackerTest.java | 6 +- .../apache/cassandra/index/CustomIndexTest.java | 3 +- .../index/SecondaryIndexManagerTest.java | 721 +++++++++++++++++++ .../index/internal/CassandraIndexTest.java | 1 + .../index/internal/CustomCassandraIndex.java | 2 - .../cassandra/index/sasi/SASIIndexTest.java | 12 +- .../cassandra/stress/CompactionStress.java | 4 + 17 files changed, 1236 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 82a0bda..ab06e2b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130) * Improve calculation of available disk space for compaction (CASSANDRA-13068) * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579) * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570) http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index dceb41d..893d525 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -410,7 +410,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean minCompactionThreshold = new DefaultValue<>(metadata.get().params.compaction.minCompactionThreshold()); maxCompactionThreshold = new DefaultValue<>(metadata.get().params.compaction.maxCompactionThreshold()); crcCheckChance = new DefaultValue<>(metadata.get().params.crcCheckChance); - indexManager = new SecondaryIndexManager(this); viewManager = keyspace.viewManager.forTable(metadata.id); metric = new TableMetrics(this); fileIndexGenerator.set(generation); @@ -455,6 +454,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // create the private ColumnFamilyStores for the secondary column indexes + indexManager = new SecondaryIndexManager(this); for (IndexMetadata info : metadata.get().indexes) indexManager.addIndex(info); @@ -567,7 +567,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean data.dropSSTables(); LifecycleTransaction.waitForDeletions(); - indexManager.invalidateAllIndexesBlocking(); + indexManager.dropAllIndexes(); invalidateCaches(); } @@ -800,7 +800,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try (Refs<SSTableReader> refs = Refs.ref(newSSTables)) { data.addSSTables(newSSTables); - indexManager.buildAllIndexesBlocking(newSSTables); } logger.info("Done loading load new SSTables for {}/{}", keyspace.getName(), name); @@ -815,14 +814,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName); - Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames)); - - Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL); - try (Refs<SSTableReader> refs = Refs.ref(sstables)) - { - logger.info("User Requested secondary index re-build for {}/{} indexes: {}", ksName, cfName, Joiner.on(',').join(idxNames)); - cfs.indexManager.rebuildIndexesBlocking(refs, indexes); - } + logger.info("User Requested secondary index re-build for {}/{} indexes: {}", ksName, cfName, Joiner.on(',').join(idxNames)); + cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(idxNames))); } public AbstractCompactionStrategy createCompactionStrategyInstance(CompactionParams compactionParams) @@ -1451,7 +1444,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void addSSTable(SSTableReader sstable) { assert sstable.getColumnFamilyName().equals(name); - addSSTables(Arrays.asList(sstable)); + addSSTables(Collections.singletonList(sstable)); } public void addSSTables(Collection<SSTableReader> sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index bc8b305..d7e00da 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1619,7 +1619,7 @@ public class CompactionManager implements CompactionManagerMBean /** * Is not scheduled, because it is performing disjoint work from sstable compaction. */ - public Future<?> submitIndexBuild(final SecondaryIndexBuilder builder) + public ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder) { Runnable runnable = new Runnable() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index e2fcb06..d46ee60 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -359,7 +359,7 @@ public class Tracker notifyDiscarded(memtable); // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both? - fail = notifyAdded(sstables, fail); + fail = notifyAdded(sstables, memtable, fail); if (!isDummy() && !cfstore.isValid()) dropSSTables(); @@ -417,9 +417,9 @@ public class Tracker return accumulate; } - Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate) + Throwable notifyAdded(Iterable<SSTableReader> added, Memtable memtable, Throwable accumulate) { - INotification notification = new SSTableAddedNotification(added); + INotification notification = new SSTableAddedNotification(added, memtable); for (INotificationConsumer subscriber : subscribers) { try @@ -436,7 +436,7 @@ public class Tracker public void notifyAdded(Iterable<SSTableReader> added) { - maybeFail(notifyAdded(added, null)); + maybeFail(notifyAdded(added, null, null)); } public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged) http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index f7b7d13..c2ed134 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -20,20 +20,29 @@ package org.apache.cassandra.index; import java.lang.reflect.Constructor; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.commons.lang3.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +62,9 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.internal.CassandraIndex; import org.apache.cassandra.index.transactions.*; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.notifications.INotification; +import org.apache.cassandra.notifications.INotificationConsumer; +import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; @@ -60,6 +72,7 @@ import org.apache.cassandra.service.pager.SinglePartitionPager; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Refs; @@ -67,7 +80,7 @@ import org.apache.cassandra.utils.concurrent.Refs; * Handles the core maintenance functionality associated with indexes: adding/removing them to or from * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata * and so on. - * + * <br><br> * The Index interface defines a number of methods which return {@code Callable<?>}. These are primarily the * management tasks for an index implementation. Most of them are currently executed in a blocking * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty @@ -76,51 +89,76 @@ import org.apache.cassandra.utils.concurrent.Refs; * then be defined with as void and called directly from SIM (rather than being run via the executor service). * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example, * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously. - * + * <br><br> * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may * involve a significant effort, building a new index over any existing data. We perform this task asynchronously; * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom * indexes is performed on the CompactionManager. - * + * <br><br> * This class also provides instances of processors which listen to updates to the base table and forward to * registered Indexes the info required to keep those indexes up to date. * There are two variants of these processors, each with a factory method provided by SIM: - * IndexTransaction: deals with updates generated on the regular write path. - * CleanupTransaction: used when partitions are modified during compaction or cleanup operations. + * IndexTransaction: deals with updates generated on the regular write path. + * CleanupTransaction: used when partitions are modified during compaction or cleanup operations. * Further details on their usage and lifecycles can be found in the interface definitions below. - * - * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able + * <br><br> + * The bestIndexFor method is used at query time to identify the most selective index of those able * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle. * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on * a target replica. + * <br><br> + * Finally, this class provides a clear and safe lifecycle to manage index builds, either full rebuilds via + * {@link this#rebuildIndexesBlocking(Set)} or builds of new sstables + * added via {@link org.apache.cassandra.notifications.SSTableAddedNotification}s, guaranteeing + * the following: + * <ul> + * <li>The initialization task and any subsequent successful (re)build mark the index as built.</li> + * <li>If any (re)build operation fails, the index is not marked as built, and only another full rebuild can mark the + * index as built.</li> + * <li>Full rebuilds cannot be run concurrently with other full or sstable (re)builds.</li> + * <li>SSTable builds can always be run concurrently with any other builds.</li> + * </ul> */ -public class SecondaryIndexManager implements IndexRegistry +public class SecondaryIndexManager implements IndexRegistry, INotificationConsumer { private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class); // default page size (in rows) when rebuilding the index for a whole partition public static final int DEFAULT_PAGE_SIZE = 10000; - private Map<String, Index> indexes = Maps.newConcurrentMap(); + /** + * All registered indexes. + */ + private final Map<String, Index> indexes = Maps.newConcurrentMap(); + + /** + * The indexes that had a build failure. + */ + private final Set<String> needsFullRebuild = Sets.newConcurrentHashSet(); + + /** + * The indexes that are available for querying. + */ + private final Set<String> queryableIndexes = Sets.newConcurrentHashSet(); /** - * The indexes that are ready to server requests. + * The count of pending index builds for each index. */ - private Set<String> builtIndexes = Sets.newConcurrentHashSet(); + private final Map<String, AtomicInteger> inProgressBuilds = Maps.newConcurrentMap(); // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built - private static final ExecutorService asyncExecutor = - new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new NamedThreadFactory("SecondaryIndexManagement"), - "internal"); + private static final ListeningExecutorService asyncExecutor = MoreExecutors.listeningDecorator( + new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("SecondaryIndexManagement"), + "internal")); // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc - private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService(); + private static final ListeningExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService(); /** * The underlying column family containing the source data for these indexes @@ -130,9 +168,9 @@ public class SecondaryIndexManager implements IndexRegistry public SecondaryIndexManager(ColumnFamilyStore baseCfs) { this.baseCfs = baseCfs; + baseCfs.getTracker().subscribe(this); } - /** * Drops and adds new indexes associated with the underlying CF */ @@ -160,27 +198,63 @@ public class SecondaryIndexManager implements IndexRegistry : blockingExecutor.submit(reloadTask); } - private Future<?> createIndex(IndexMetadata indexDef) + @SuppressWarnings("unchecked") + private synchronized Future<?> createIndex(IndexMetadata indexDef) { - Index index = createInstance(indexDef); + final Index index = createInstance(indexDef); + String indexName = index.getIndexMetadata().name; index.register(this); + // now mark as building prior to initializing + markIndexesBuilding(ImmutableSet.of(index), true); + + Callable<?> initialBuildTask = null; // if the index didn't register itself, we can probably assume that no initialization needs to happen - final Callable<?> initialBuildTask = indexes.containsKey(indexDef.name) - ? index.getInitializationTask() - : null; + if (indexes.containsKey(indexDef.name)) + { + try + { + initialBuildTask = index.getInitializationTask(); + } + catch (Throwable t) + { + logAndMarkIndexesFailed(Collections.singleton(index), t); + throw t; + } + } + + // if there's no initialization, just mark as built and return: if (initialBuildTask == null) { - // We need to make sure that the index is marked as built in the case where the initialBuildTask - // does not need to be run (if the index didn't register itself or if the base table was empty). - markIndexBuilt(indexDef.name); + markIndexBuilt(index, true); return Futures.immediateFuture(null); } - return asyncExecutor.submit(index.getInitializationTask()); + + // otherwise run the initialization task asynchronously with a callback to mark it built or failed + final SettableFuture initialization = SettableFuture.create(); + Futures.addCallback(asyncExecutor.submit(initialBuildTask), new FutureCallback() + { + @Override + public void onFailure(Throwable t) + { + logAndMarkIndexesFailed(Collections.singleton(index), t); + initialization.setException(t); + } + + @Override + public void onSuccess(Object o) + { + markIndexBuilt(index, true); + initialization.set(o); + } + }, MoreExecutors.directExecutor()); + + return initialization; } /** * Adds and builds a index + * * @param indexDef the IndexMetadata describing the index */ public synchronized Future<?> addIndex(IndexMetadata indexDef) @@ -195,11 +269,11 @@ public class SecondaryIndexManager implements IndexRegistry * Checks if the specified index is queryable. * * @param index the index - * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise + * @return <code>true</code> if the specified index is registered, <code>false</code> otherwise */ public boolean isIndexQueryable(Index index) { - return builtIndexes.contains(index.getIndexMetadata().name); + return queryableIndexes.contains(index.getIndexMetadata().name); } public synchronized void removeIndex(String indexName) @@ -208,7 +282,7 @@ public class SecondaryIndexManager implements IndexRegistry if (null != index) { markIndexRemoved(indexName); - executeBlocking(index.getInvalidateTask()); + executeBlocking(index.getInvalidateTask(), null); } } @@ -231,57 +305,35 @@ public class SecondaryIndexManager implements IndexRegistry */ public void markAllIndexesRemoved() { - getBuiltIndexNames().forEach(this::markIndexRemoved); + getBuiltIndexNames().forEach(this::markIndexRemoved); } /** - * Does a full, blocking rebuild of the indexes specified by columns from the sstables. - * Caller must acquire and release references to the sstables used here. - * Note also that only this method of (re)building indexes: - * a) takes a set of index *names* rather than Indexers - * b) marks exsiting indexes removed prior to rebuilding - * - * @param sstables the data to build from - * @param indexNames the list of indexes to be rebuilt - */ - public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames) - { - Set<Index> toRebuild = indexes.values().stream() - .filter(index -> indexNames.contains(index.getIndexMetadata().name)) - .filter(Index::shouldBuildBlocking) - .collect(Collectors.toSet()); - if (toRebuild.isEmpty()) - { - logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames)); - return; - } - - toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexMetadata().name)); - - buildIndexesBlocking(sstables, toRebuild); - - toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexMetadata().name)); - } - - public void buildAllIndexesBlocking(Collection<SSTableReader> sstables) - { - buildIndexesBlocking(sstables, indexes.values() - .stream() - .filter(Index::shouldBuildBlocking) - .collect(Collectors.toSet())); - } - - // For convenience, may be called directly from Index impls - public void buildIndexBlocking(Index index) + * Does a blocking full rebuild of the specifed indexes from all the sstables in the base table. + * Note also that this method of (re)building indexes: + * a) takes a set of index *names* rather than Indexers + * b) marks existing indexes removed prior to rebuilding + * c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run + * concurrently + * + * @param indexNames the list of indexes to be rebuilt + */ + public void rebuildIndexesBlocking(Set<String> indexNames) { - if (index.shouldBuildBlocking()) + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); + Refs<SSTableReader> allSSTables = viewFragment.refs) { - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); - Refs<SSTableReader> sstables = viewFragment.refs) + Set<Index> toRebuild = indexes.values().stream() + .filter(index -> indexNames.contains(index.getIndexMetadata().name)) + .filter(Index::shouldBuildBlocking) + .collect(Collectors.toSet()); + if (toRebuild.isEmpty()) { - buildIndexesBlocking(sstables, Collections.singleton(index)); - markIndexBuilt(index.getIndexMetadata().name); + logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames)); + return; } + + buildIndexesBlocking(allSSTables, toRebuild, true); } } @@ -356,55 +408,255 @@ public class SecondaryIndexManager implements IndexRegistry return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR); } - private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes) + /** + * Performs a blocking (re)indexing of the specified SSTables for the specified indexes. + * + * @param sstables the SSTables to be (re)indexed + * @param indexes the indexes to be (re)built for the specifed SSTables + * @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise + */ + @SuppressWarnings({ "unchecked" }) + private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes, boolean isFullRebuild) { if (indexes.isEmpty()) return; - logger.info("Submitting index build of {} for data in {}", - indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")), - sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); + // Mark all indexes as building: this step must happen first, because if any index can't be marked, the whole + // process needs to abort + markIndexesBuilding(indexes, isFullRebuild); + + // Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed: + final Set<Index> builtIndexes = new HashSet<>(); + final Set<Index> unbuiltIndexes = new HashSet<>(); - Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>(); - for (Index index : indexes) + // Any exception thrown during index building that could be suppressed by the finally block + Exception accumulatedFail = null; + + try { - Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>()); - stored.add(index); + logger.info("Submitting index build of {} for data in {}", + indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")), + sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); + + // Group all building tasks + Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>(); + for (Index index : indexes) + { + Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>()); + stored.add(index); + } + + // Schedule all index building tasks with a callback to mark them as built or failed + List<Future<?>> futures = new ArrayList<>(byType.size()); + byType.forEach((buildingSupport, groupedIndexes) -> + { + SecondaryIndexBuilder builder = buildingSupport.getIndexBuildTask(baseCfs, groupedIndexes, sstables); + final SettableFuture build = SettableFuture.create(); + Futures.addCallback(CompactionManager.instance.submitIndexBuild(builder), new FutureCallback() + { + @Override + public void onFailure(Throwable t) + { + logAndMarkIndexesFailed(groupedIndexes, t); + unbuiltIndexes.addAll(groupedIndexes); + build.setException(t); + } + + @Override + public void onSuccess(Object o) + { + groupedIndexes.forEach(i -> markIndexBuilt(i, isFullRebuild)); + logger.info("Index build of {} completed", getIndexNames(groupedIndexes)); + builtIndexes.addAll(groupedIndexes); + build.set(o); + } + }); + futures.add(build); + }); + + // Finally wait for the index builds to finish and flush the indexes that built successfully + FBUtilities.waitOnFutures(futures); } + catch (Exception e) + { + accumulatedFail = e; + throw e; + } + finally + { + try + { + // Fail any indexes that couldn't be marked + Set<Index> failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes)); + if (!failedIndexes.isEmpty()) + { + logAndMarkIndexesFailed(failedIndexes, accumulatedFail); + } - List<Future<?>> futures = byType.entrySet() - .stream() - .map((e) -> e.getKey().getIndexBuildTask(baseCfs, e.getValue(), sstables)) - .map(CompactionManager.instance::submitIndexBuild) - .collect(Collectors.toList()); + // Flush all built indexes with an aynchronous callback to log the success or failure of the flush + flushIndexesBlocking(builtIndexes, new FutureCallback() + { + String indexNames = StringUtils.join(builtIndexes.stream() + .map(i -> i.getIndexMetadata().name) + .collect(Collectors.toList()), ','); - FBUtilities.waitOnFutures(futures); + @Override + public void onFailure(Throwable ignored) + { + logger.info("Index flush of {} failed", indexNames); + } - flushIndexesBlocking(indexes); - logger.info("Index build of {} complete", - indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(","))); + @Override + public void onSuccess(Object ignored) + { + logger.info("Index flush of {} completed", indexNames); + } + }); + } + catch (Exception e) + { + if (accumulatedFail != null) + { + accumulatedFail.addSuppressed(e); + } + else + { + throw e; + } + } + } + } + + private String getIndexNames(Set<Index> indexes) + { + List<String> indexNames = indexes.stream() + .map(i -> i.getIndexMetadata().name) + .collect(Collectors.toList()); + return StringUtils.join(indexNames, ','); } /** - * Marks the specified index as build. - * <p>This method is public as it need to be accessible from the {@link Index} implementations</p> - * @param indexName the index name + * Marks the specified indexes as (re)building if: + * 1) There's no in progress rebuild of any of the given indexes. + * 2) There's an in progress rebuild but the caller is not a full rebuild. + * <p> + * Otherwise, this method invocation fails, as it is not possible to run full rebuilds while other concurrent rebuilds + * are in progress. Please note this is checked atomically against all given indexes; that is, no index will be marked + * if even a single one fails. + * <p> + * Marking an index as "building" practically means: + * 1) The index is removed from the "failed" set if this is a full rebuild. + * 2) The index is removed from the system keyspace built indexes. + * <p> + * Thread safety is guaranteed by having all methods managing index builds synchronized: being synchronized on + * the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same + * lock, but this is fine as the work done while holding such lock is trivial. + * <p> + * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the + * rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt. + * + * @param indexes the index to be marked as building + * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise */ - public void markIndexBuilt(String indexName) + private synchronized void markIndexesBuilding(Set<Index> indexes, boolean isFullRebuild) { - builtIndexes.add(indexName); - if (DatabaseDescriptor.isDaemonInitialized()) - SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName); + String keyspaceName = baseCfs.keyspace.getName(); + + // First step is to validate against concurrent rebuilds; it would be more optimized to do everything on a single + // step, but we're not really expecting a very high number of indexes, and this isn't on any hot path, so + // we're favouring readability over performance + indexes.forEach(index -> + { + String indexName = index.getIndexMetadata().name; + AtomicInteger counter = inProgressBuilds.computeIfAbsent(indexName, ignored -> new AtomicInteger(0)); + + if (counter.get() > 0 && isFullRebuild) + throw new IllegalStateException(String.format("Cannot rebuild index %s as another index build for the same index is currently in progress.", indexName)); + }); + + // Second step is the actual marking: + indexes.forEach(index -> + { + String indexName = index.getIndexMetadata().name; + AtomicInteger counter = inProgressBuilds.computeIfAbsent(indexName, ignored -> new AtomicInteger(0)); + + if (isFullRebuild) + needsFullRebuild.remove(indexName); + + if (counter.getAndIncrement() == 0 && DatabaseDescriptor.isDaemonInitialized()) + SystemKeyspace.setIndexRemoved(keyspaceName, indexName); + }); + } + + /** + * Marks the specified index as built if there are no in progress index builds and the index is not failed. + * {@link #markIndexesBuilding(Set, boolean)} should always be invoked before this method. + * + * @param index the index to be marked as built + * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise + */ + private synchronized void markIndexBuilt(Index index, boolean isFullRebuild) + { + String indexName = index.getIndexMetadata().name; + AtomicInteger counter = inProgressBuilds.get(indexName); + if (counter != null) + { + assert counter.get() > 0; + if (counter.decrementAndGet() == 0) + { + if (isFullRebuild) + queryableIndexes.add(indexName); + inProgressBuilds.remove(indexName); + if (!needsFullRebuild.contains(indexName) && DatabaseDescriptor.isDaemonInitialized()) + SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName); + } + } + } + + /** + * Marks the specified index as failed. + * {@link #markIndexesBuilding(Set, boolean)} should always be invoked before this method. + * + * @param index the index to be marked as built + */ + private synchronized void markIndexFailed(Index index) + { + String indexName = index.getIndexMetadata().name; + AtomicInteger counter = inProgressBuilds.get(indexName); + if (counter != null) + { + assert counter.get() > 0; + + counter.decrementAndGet(); + + if (DatabaseDescriptor.isDaemonInitialized()) + SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName); + + needsFullRebuild.add(indexName); + } + } + + private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure) + { + JVMStabilityInspector.inspectThrowable(indexBuildFailure); + if (indexBuildFailure != null) + logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure); + else + logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes)); + indexes.forEach(SecondaryIndexManager.this::markIndexFailed); } /** * Marks the specified index as removed. - * <p>This method is public as it need to be accessible from the {@link Index} implementations</p> + * * @param indexName the index name */ - public void markIndexRemoved(String indexName) + private synchronized void markIndexRemoved(String indexName) { SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName); + queryableIndexes.remove(indexName); + needsFullRebuild.remove(indexName); + inProgressBuilds.remove(indexName); } public Index getIndexByName(String indexName) @@ -419,7 +671,7 @@ public class SecondaryIndexManager implements IndexRegistry { assert indexDef.options != null; String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME); - assert ! Strings.isNullOrEmpty(className); + assert !Strings.isNullOrEmpty(className); try { Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index"); @@ -443,16 +695,22 @@ public class SecondaryIndexManager implements IndexRegistry */ public void truncateAllIndexesBlocking(final long truncatedAt) { - executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt)); + executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt), null); } /** * Remove all indexes */ - public void invalidateAllIndexesBlocking() + public void dropAllIndexes() { markAllIndexesRemoved(); - executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask); + invalidateAllIndexesBlocking(); + } + + @VisibleForTesting + public void invalidateAllIndexesBlocking() + { + executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask, null); } /** @@ -460,7 +718,7 @@ public class SecondaryIndexManager implements IndexRegistry */ public void flushAllIndexesBlocking() { - flushIndexesBlocking(ImmutableSet.copyOf(indexes.values())); + flushIndexesBlocking(ImmutableSet.copyOf(indexes.values())); } /** @@ -468,24 +726,7 @@ public class SecondaryIndexManager implements IndexRegistry */ public void flushIndexesBlocking(Set<Index> indexes) { - if (indexes.isEmpty()) - return; - - List<Future<?>> wait = new ArrayList<>(); - List<Index> nonCfsIndexes = new ArrayList<>(); - - // for each CFS backed index, submit a flush task which we'll wait on for completion - // for the non-CFS backed indexes, we'll flush those while we wait. - synchronized (baseCfs.getTracker()) - { - indexes.forEach(index -> - index.getBackingTable() - .map(cfs -> wait.add(cfs.forceFlush())) - .orElseGet(() -> nonCfsIndexes.add(index))); - } - - executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask); - FBUtilities.waitOnFutures(wait); + flushIndexesBlocking(indexes, null); } /** @@ -496,7 +737,7 @@ public class SecondaryIndexManager implements IndexRegistry executeAllBlocking(indexes.values() .stream() .filter(index -> !index.getBackingTable().isPresent()), - Index::getBlockingFlushTask); + Index::getBlockingFlushTask, null); } /** @@ -505,9 +746,32 @@ public class SecondaryIndexManager implements IndexRegistry public void executePreJoinTasksBlocking(boolean hadBootstrap) { logger.info("Executing pre-join{} tasks for: {}", hadBootstrap ? " post-bootstrap" : "", this.baseCfs); - executeAllBlocking(indexes.values().stream(), (index) -> { + executeAllBlocking(indexes.values().stream(), (index) -> + { return index.getPreJoinTask(hadBootstrap); - }); + }, null); + } + + private void flushIndexesBlocking(Set<Index> indexes, FutureCallback<Object> callback) + { + if (indexes.isEmpty()) + return; + + List<Future<?>> wait = new ArrayList<>(); + List<Index> nonCfsIndexes = new ArrayList<>(); + + // for each CFS backed index, submit a flush task which we'll wait on for completion + // for the non-CFS backed indexes, we'll flush those while we wait. + synchronized (baseCfs.getTracker()) + { + indexes.forEach(index -> + index.getBackingTable() + .map(cfs -> wait.add(cfs.forceFlush())) + .orElseGet(() -> nonCfsIndexes.add(index))); + } + + executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask, callback); + FBUtilities.waitOnFutures(wait); } /** @@ -517,8 +781,8 @@ public class SecondaryIndexManager implements IndexRegistry { Set<String> allIndexNames = new HashSet<>(); indexes.values().stream() - .map(i -> i.getIndexMetadata().name) - .forEach(allIndexNames::add); + .map(i -> i.getIndexMetadata().name) + .forEach(allIndexNames::add); return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames); } @@ -566,7 +830,8 @@ public class SecondaryIndexManager implements IndexRegistry if (!page.hasNext()) break; - try (UnfilteredRowIterator partition = page.next()) { + try (UnfilteredRowIterator partition = page.next()) + { Set<Index.Indexer> indexers = indexes.stream() .map(index -> index.indexerFor(key, partition.columns(), @@ -665,7 +930,7 @@ public class SecondaryIndexManager implements IndexRegistry /** * Delete all data from all indexes for this partition. * For when cleanup rips a partition out entirely. - * + * <p> * TODO : improve cleanup transaction to batch updates and perform them async */ public void deletePartition(UnfilteredRowIterator partition, int nowInSec) @@ -690,28 +955,28 @@ public class SecondaryIndexManager implements IndexRegistry partition.columns(), nowInSec); indexTransaction.start(); - indexTransaction.onRowDelete((Row)unfiltered); + indexTransaction.onRowDelete((Row) unfiltered); indexTransaction.commit(); } } /** * Called at query time to choose which (if any) of the registered index implementations to use for a given query. - * + * <p> * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces * the search space the most. - * + * <p> * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they * specify are automatically included. Following that, the registered indexes are filtered to include only those * which support the standard expressions in the RowFilter. - * + * <p> * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows * method. - * + * <p> * Implementation specific validation of the target expression, either custom or standard, by the selected * index should be performed in the searcherFor method to ensure that we pick the right index regardless of * the validity of the expression. - * + * <p> * This method is only called once during the lifecycle of a ReadCommand and the result is * cached for future use when obtaining a Searcher, getting the index's underlying CFS for * ReadOrderGroup, or an estimate of the result size from an average index query. @@ -732,7 +997,7 @@ public class SecondaryIndexManager implements IndexRegistry { // Only a single custom expression is allowed per query and, if present, // we want to always favour the index specified in such an expression - RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression; + RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression) expression; logger.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name); Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name); return indexes.get(customExpression.getTargetIndex().name); @@ -781,6 +1046,7 @@ public class SecondaryIndexManager implements IndexRegistry * will process it. The partition key as well as the clustering and * cell values for each row in the update may be checked by index * implementations + * * @param update PartitionUpdate containing the values to be validated by registered Index implementations * @throws InvalidRequestException */ @@ -808,9 +1074,7 @@ public class SecondaryIndexManager implements IndexRegistry private Index unregisterIndex(String name) { Index removed = indexes.remove(name); - builtIndexes.remove(name); - logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry", - name); + logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry", name); return removed; } @@ -882,7 +1146,7 @@ public class SecondaryIndexManager implements IndexRegistry { private final Index.Indexer[] indexers; - private WriteTimeTransaction(Index.Indexer...indexers) + private WriteTimeTransaction(Index.Indexer... indexers) { // don't allow null indexers, if we don't need any use a NullUpdater object for (Index.Indexer indexer : indexers) assert indexer != null; @@ -945,7 +1209,6 @@ public class SecondaryIndexManager implements IndexRegistry if (merged == null || (original != null && shouldCleanupOldValue(original, merged))) toRemove.addCell(original); - } }; Rows.diff(diffListener, updated, existing); @@ -1011,7 +1274,7 @@ public class SecondaryIndexManager implements IndexRegistry rows = new Row[versions]; } - public void onRowMerge(Row merged, Row...versions) + public void onRowMerge(Row merged, Row... versions) { // Diff listener constructs rows representing deltas between the merged and original versions // These delta rows are then passed to registered indexes for removal processing @@ -1051,7 +1314,7 @@ public class SecondaryIndexManager implements IndexRegistry Rows.diff(diffListener, merged, versions); - for(int i = 0; i < builders.length; i++) + for (int i = 0; i < builders.length; i++) if (builders[i] != null) rows[i] = builders[i].build(); } @@ -1147,13 +1410,17 @@ public class SecondaryIndexManager implements IndexRegistry } } - private static void executeBlocking(Callable<?> task) + private void executeBlocking(Callable<?> task, FutureCallback<Object> callback) { if (null != task) - FBUtilities.waitOnFuture(blockingExecutor.submit(task)); + { + ListenableFuture<?> f = blockingExecutor.submit(task); + if (callback != null) Futures.addCallback(f, callback); + FBUtilities.waitOnFuture(f); + } } - private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function) + private void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function, FutureCallback<Object> callback) { if (function == null) { @@ -1162,11 +1429,33 @@ public class SecondaryIndexManager implements IndexRegistry } List<Future<?>> waitFor = new ArrayList<>(); - indexers.forEach(indexer -> { - Callable<?> task = function.apply(indexer); - if (null != task) - waitFor.add(blockingExecutor.submit(task)); - }); + indexers.forEach(indexer -> + { + Callable<?> task = function.apply(indexer); + if (null != task) + { + ListenableFuture<?> f = blockingExecutor.submit(task); + if (callback != null) Futures.addCallback(f, callback); + waitFor.add(f); + } + }); FBUtilities.waitOnFutures(waitFor); } + + public void handleNotification(INotification notification, Object sender) + { + if (!indexes.isEmpty() && notification instanceof SSTableAddedNotification) + { + SSTableAddedNotification notice = (SSTableAddedNotification) notification; + + // SSTables asociated to a memtable come from a flush, so their contents have already been indexed + if (!notice.memtable().isPresent()) + buildIndexesBlocking(Lists.newArrayList(notice.added), + indexes.values() + .stream() + .filter(Index::shouldBuildBlocking) + .collect(Collectors.toSet()), + false); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index c6f7d98..c7f3536 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -699,7 +699,6 @@ public abstract class CassandraIndex implements Index baseCfs.metadata.keyspace, baseCfs.metadata.name, metadata.name); - baseCfs.indexManager.markIndexBuilt(metadata.name); return; } @@ -713,7 +712,6 @@ public abstract class CassandraIndex implements Index Future<?> future = CompactionManager.instance.submitIndexBuild(builder); FBUtilities.waitOnFuture(future); indexCfs.forceBlockingFlush(); - baseCfs.indexManager.markIndexBuilt(metadata.name); } logger.info("Index build of {} complete", metadata.name); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java index 56d6130..9c95a18 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java @@ -17,13 +17,46 @@ */ package org.apache.cassandra.notifications; +import java.util.Optional; + +import javax.annotation.Nullable; + +import org.apache.cassandra.db.Memtable; import org.apache.cassandra.io.sstable.format.SSTableReader; +/** + * Notification sent after SSTables are added to their {@link org.apache.cassandra.db.ColumnFamilyStore}. + */ public class SSTableAddedNotification implements INotification { + /** The added SSTables */ public final Iterable<SSTableReader> added; - public SSTableAddedNotification(Iterable<SSTableReader> added) + + /** The memtable from which the tables come when they have been added due to a flush, {@code null} otherwise. */ + @Nullable + private final Memtable memtable; + + /** + * Creates a new {@code SSTableAddedNotification} for the specified SSTables and optional memtable. + * + * @param added the added SSTables + * @param memtable the memtable from which the tables come when they have been added due to a memtable flush, + * or {@code null} if they don't come from a flush + */ + public SSTableAddedNotification(Iterable<SSTableReader> added, @Nullable Memtable memtable) { this.added = added; + this.memtable = memtable; + } + + /** + * Returns the memtable from which the tables come when they have been added due to a memtable flush. If not, an + * empty Optional should be returned. + * + * @return the origin memtable in case of a flush, {@link Optional#empty()} otherwise + */ + public Optional<Memtable> memtable() + { + return Optional.ofNullable(memtable); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 34e7cc8..925dc85 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -33,7 +33,6 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -231,9 +230,8 @@ public class StreamReceiveTask extends StreamTask { task.finishTransaction(); - // add sstables and build secondary indexes + // add sstables (this will build secondary indexes too, see CASSANDRA-10130) cfs.addSSTables(readers); - cfs.indexManager.buildAllIndexesBlocking(readers); //invalidate row and counter cache if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java index 0b27f73..af629e5 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java @@ -87,7 +87,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR csm.getForPendingRepair(repairID).forEach(Assert::assertNull); // add the sstable - csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); Assert.assertFalse(repairedContains(sstable)); Assert.assertFalse(unrepairedContains(sstable)); csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); @@ -181,7 +181,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); - csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); Assert.assertTrue(pendingContains(repairID, sstable)); // delete sstable @@ -210,7 +210,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); - csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); strategies = csm.getStrategies(); Assert.assertEquals(3, strategies.size()); @@ -228,7 +228,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); - csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); LocalSessionAccessor.finalizeUnsafe(repairID); csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); Assert.assertNotNull(pendingContains(repairID, sstable)); @@ -265,7 +265,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); SSTableReader sstable = makeSSTable(true); mutateRepaired(sstable, repairID); - csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker()); + csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker()); LocalSessionAccessor.failUnsafe(repairID); csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull); http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 360a2cd..624f119 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -378,7 +378,7 @@ public class LeveledCompactionStrategyTest assertFalse(unrepaired.manifest.generations[2].contains(sstable1)); unrepaired.removeSSTable(sstable2); - manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this); + manager.handleNotification(new SSTableAddedNotification(singleton(sstable2), null), this); assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2)); assertFalse(repaired.manifest.getLevel(1).contains(sstable2)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index a5a1baf..93ee198 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db.lifecycle; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -305,6 +306,7 @@ public class TrackerTest Assert.assertEquals(2, listener.received.size()); Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable); Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added); + Assert.assertEquals(Optional.of(prev2), ((SSTableAddedNotification) listener.received.get(1)).memtable()); listener.received.clear(); Assert.assertTrue(reader.isKeyCacheSetup()); Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount()); @@ -326,6 +328,7 @@ public class TrackerTest Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable); Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable); Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(2)).added); + Assert.assertEquals(Optional.of(prev1), ((SSTableAddedNotification) listener.received.get(2)).memtable()); Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification); Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size()); DatabaseDescriptor.setIncrementalBackupsEnabled(backups); @@ -360,8 +363,9 @@ public class TrackerTest MockListener failListener = new MockListener(true); tracker.subscribe(failListener); tracker.subscribe(listener); - Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null)); + Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null, null)); Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added); + Assert.assertFalse(((SSTableAddedNotification) listener.received.get(0)).memtable().isPresent()); listener.received.clear(); Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null)); Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed); http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index ed999fa..d14b50d 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; import org.junit.Test; import com.datastax.driver.core.exceptions.QueryValidationException; @@ -111,7 +112,7 @@ public class CustomIndexTest extends CQLTester excluded.reset(); assertTrue(excluded.rowsInserted.isEmpty()); - indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables()); + indexManager.rebuildIndexesBlocking(Sets.newHashSet(toInclude, toExclude)); assertEquals(3, included.rowsInserted.size()); assertTrue(excluded.rowsInserted.isEmpty()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org