Make choice of SSTableReader types explicit All accessors of a collection of SSTableReader must now specify whether they desire the LIVE or CANONICAL set, so that no internal clients are accidentally exposed to a partial sstable they are not capable of safely handling.
patch by benedict; reviewed by marcus for CASSANDRA-9699 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ad8cad7c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ad8cad7c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ad8cad7c Branch: refs/heads/trunk Commit: ad8cad7c4d05fd5dea68fb274c81a102533ebe36 Parents: a8c50b8 Author: Benedict Elliott Smith <[email protected]> Authored: Sun Jun 28 14:49:09 2015 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Tue Jul 28 10:28:31 2015 +0100 ---------------------------------------------------------------------- .../apache/cassandra/db/BatchlogManager.java | 4 +- .../apache/cassandra/db/ColumnFamilyStore.java | 188 ++++--------------- src/java/org/apache/cassandra/db/Keyspace.java | 5 +- .../cassandra/db/PartitionRangeReadCommand.java | 4 +- .../db/SinglePartitionNamesCommand.java | 4 +- .../db/SinglePartitionSliceCommand.java | 4 +- .../cassandra/db/SizeEstimatesRecorder.java | 4 +- .../org/apache/cassandra/db/SystemKeyspace.java | 2 +- .../db/commitlog/CommitLogReplayer.java | 3 +- .../compaction/AbstractCompactionStrategy.java | 5 +- .../db/compaction/CompactionController.java | 4 +- .../db/compaction/CompactionManager.java | 8 +- .../compaction/CompactionStrategyManager.java | 4 +- .../DateTieredCompactionStrategy.java | 25 +-- .../SizeTieredCompactionStrategy.java | 4 +- .../cassandra/db/index/SecondaryIndex.java | 6 +- .../db/lifecycle/LifecycleTransaction.java | 2 +- .../cassandra/db/lifecycle/SSTableSet.java | 12 ++ .../apache/cassandra/db/lifecycle/Tracker.java | 11 +- .../org/apache/cassandra/db/lifecycle/View.java | 101 ++++++++-- .../io/sstable/IndexSummaryManager.java | 5 +- .../io/sstable/format/SSTableReader.java | 13 +- .../apache/cassandra/metrics/TableMetrics.java | 46 ++--- .../apache/cassandra/service/CacheService.java | 6 +- .../cassandra/streaming/StreamSession.java | 34 ++-- .../db/compaction/LongCompactionsTest.java | 2 +- test/unit/org/apache/cassandra/Util.java | 2 +- .../cassandra/cache/AutoSavingCacheTest.java | 6 +- .../miscellaneous/CrcCheckChanceTest.java | 12 +- .../SSTableMetadataTrackingTest.java | 36 ++-- .../org/apache/cassandra/db/CleanupTest.java | 4 +- .../cassandra/db/ColumnFamilyMetricTest.java | 2 +- .../cassandra/db/ColumnFamilyStoreTest.java | 9 +- .../apache/cassandra/db/HintedHandOffTest.java | 4 +- .../org/apache/cassandra/db/KeyCacheTest.java | 2 +- .../org/apache/cassandra/db/KeyspaceTest.java | 6 +- .../apache/cassandra/db/RangeTombstoneTest.java | 24 +-- .../unit/org/apache/cassandra/db/ScrubTest.java | 16 +- .../apache/cassandra/db/SecondaryIndexTest.java | 2 +- .../org/apache/cassandra/db/VerifyTest.java | 20 +- .../db/compaction/AntiCompactionTest.java | 51 ++--- .../compaction/BlacklistingCompactionsTest.java | 2 +- .../compaction/CompactionAwareWriterTest.java | 24 +-- .../db/compaction/CompactionsPurgeTest.java | 16 +- .../db/compaction/CompactionsTest.java | 81 +++++--- .../DateTieredCompactionStrategyTest.java | 10 +- .../LeveledCompactionStrategyTest.java | 12 +- .../db/compaction/OneCompactionTest.java | 2 +- .../SizeTieredCompactionStrategyTest.java | 2 +- .../cassandra/db/compaction/TTLExpiryTest.java | 12 +- .../db/lifecycle/RealTransactionsTest.java | 4 +- .../apache/cassandra/db/lifecycle/ViewTest.java | 17 +- .../io/sstable/IndexSummaryManagerTest.java | 30 +-- .../cassandra/io/sstable/LegacySSTableTest.java | 4 +- .../io/sstable/SSTableMetadataTest.java | 48 ++--- .../cassandra/io/sstable/SSTableReaderTest.java | 20 +- .../io/sstable/SSTableRewriterTest.java | 54 +++--- .../io/sstable/SSTableScannerTest.java | 12 +- .../org/apache/cassandra/schema/DefsTest.java | 2 +- .../streaming/StreamTransferTaskTest.java | 2 +- .../streaming/StreamingTransferTest.java | 20 +- 61 files changed, 554 insertions(+), 522 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index e8b76be..154a86b 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -39,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.WriteFailureException; @@ -455,7 +456,8 @@ public class BatchlogManager implements BatchlogManagerMBean ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); cfs.forceBlockingFlush(); Collection<Descriptor> descriptors = new ArrayList<>(); - for (SSTableReader sstr : cfs.getSSTables()) + // expects ALL sstables to be available for compaction, so just use live set... + for (SSTableReader sstr : cfs.getSSTables(SSTableSet.LIVE)) descriptors.add(sstr.descriptor); if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 8d14120..e040eaa 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -35,10 +35,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.*; import com.google.common.util.concurrent.*; -import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; -import org.apache.cassandra.db.lifecycle.View; -import org.apache.cassandra.db.lifecycle.Tracker; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.metrics.TableMetrics; import org.json.simple.*; @@ -291,7 +288,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { try { - for (SSTableReader sstable : keyspace.getAllSSTables()) + // TODO: this doesn't affect sstables being written + for (SSTableReader sstable : keyspace.getAllSSTables(SSTableSet.CANONICAL)) if (sstable.compression) sstable.getCompressionMetadata().parameters.setCrcCheckChance(crcCheckChance); } @@ -626,8 +624,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { logger.info("Loading new SSTables for {}/{}...", keyspace.getName(), name); - Set<Descriptor> currentDescriptors = new HashSet<Descriptor>(); - for (SSTableReader sstable : data.getView().sstables) + Set<Descriptor> currentDescriptors = new HashSet<>(); + for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) currentDescriptors.add(sstable.descriptor); Set<SSTableReader> newSSTables = new HashSet<>(); @@ -714,13 +712,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames)); - Collection<SSTableReader> sstables = cfs.getSSTables(); - + Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL); try (Refs<SSTableReader> refs = Refs.ref(sstables)) { cfs.indexManager.setIndexRemoved(indexes); logger.info(String.format("User Requested secondary index re-build for %s/%s indexes", ksName, cfName)); - cfs.indexManager.maybeBuildSecondaryIndexes(sstables, indexes); + cfs.indexManager.maybeBuildSecondaryIndexes(refs, indexes); cfs.indexManager.setIndexBuilt(indexes); } } @@ -1155,7 +1152,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return sstables whose key range overlaps with that of the given sstables, not including itself. * (The given sstables may or may not overlap with each other.) */ - public Collection<SSTableReader> getOverlappingSSTables(Iterable<SSTableReader> sstables) + public Collection<SSTableReader> getOverlappingSSTables(SSTableSet sstableSet, Iterable<SSTableReader> sstables) { logger.debug("Checking for sstables overlapping {}", sstables); @@ -1164,12 +1161,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (!sstables.iterator().hasNext()) return ImmutableSet.of(); - SSTableIntervalTree tree = data.getView().intervalTree; + View view = data.getView(); Set<SSTableReader> results = null; for (SSTableReader sstable : sstables) { - Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<PartitionPosition, SSTableReader>create(sstable.first, sstable.last))); + Set<SSTableReader> overlaps = ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, AbstractBounds.bounds(sstable.first, true, sstable.last, true))); results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy(); } results = Sets.difference(results, ImmutableSet.copyOf(sstables)); @@ -1180,11 +1177,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** * like getOverlappingSSTables, but acquires references before returning */ - public Refs<SSTableReader> getAndReferenceOverlappingSSTables(Iterable<SSTableReader> sstables) + public Refs<SSTableReader> getAndReferenceOverlappingSSTables(SSTableSet sstableSet, Iterable<SSTableReader> sstables) { while (true) { - Iterable<SSTableReader> overlapped = getOverlappingSSTables(sstables); + Iterable<SSTableReader> overlapped = getOverlappingSSTables(sstableSet, sstables); Refs<SSTableReader> refs = Refs.tryRef(overlapped); if (refs != null) return refs; @@ -1361,12 +1358,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return data; } - public Collection<SSTableReader> getSSTables() + public Set<SSTableReader> getLiveSSTables() + { + return data.getView().liveSSTables(); + } + + public Iterable<SSTableReader> getSSTables(SSTableSet sstableSet) { - return data.getSSTables(); + return data.getView().sstables(sstableSet); } - public Set<SSTableReader> getUncompactingSSTables() + public Iterable<SSTableReader> getUncompactingSSTables() { return data.getUncompacting(); } @@ -1397,34 +1399,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return nowInSec - metadata.getGcGraceSeconds(); } - public Set<SSTableReader> getUnrepairedSSTables() - { - Set<SSTableReader> unRepairedSSTables = new HashSet<>(getSSTables()); - Iterator<SSTableReader> sstableIterator = unRepairedSSTables.iterator(); - while(sstableIterator.hasNext()) - { - SSTableReader sstable = sstableIterator.next(); - if (sstable.isRepaired()) - sstableIterator.remove(); - } - return unRepairedSSTables; - } - - public Set<SSTableReader> getRepairedSSTables() - { - Set<SSTableReader> repairedSSTables = new HashSet<>(getSSTables()); - Iterator<SSTableReader> sstableIterator = repairedSSTables.iterator(); - while(sstableIterator.hasNext()) - { - SSTableReader sstable = sstableIterator.next(); - if (!sstable.isRepaired()) - sstableIterator.remove(); - } - return repairedSSTables; - } - @SuppressWarnings("resource") - public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter) + public RefViewFragment selectAndReference(Function<View, Iterable<SSTableReader>> filter) { long failingSince = -1L; while (true) @@ -1450,80 +1426,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - public ViewFragment select(Function<View, List<SSTableReader>> filter) + public ViewFragment select(Function<View, Iterable<SSTableReader>> filter) { View view = data.getView(); - List<SSTableReader> sstables = view.intervalTree.isEmpty() - ? Collections.<SSTableReader>emptyList() - : filter.apply(view); + List<SSTableReader> sstables = Lists.newArrayList(filter.apply(view)); return new ViewFragment(sstables, view.getAllMemtables()); } - - /** - * @return a ViewFragment containing the sstables and memtables that may need to be merged - * for the given @param key, according to the interval tree - */ - public Function<View, List<SSTableReader>> viewFilter(final DecoratedKey key) - { - assert !key.isMinimum(); - return new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) - { - return view.intervalTree.search(key); - } - }; - } - - /** - * @return a ViewFragment containing the sstables and memtables that may need to be merged - * for rows within @param rowBounds, inclusive, according to the interval tree. - */ - public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<PartitionPosition> rowBounds) - { - return new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) - { - return view.sstablesInBounds(rowBounds); - } - }; - } - - /** - * @return a ViewFragment containing the sstables and memtables that may need to be merged - * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree. - */ - public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<PartitionPosition>> rowBoundsCollection, final boolean includeRepaired) - { - return new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) - { - Set<SSTableReader> sstables = Sets.newHashSet(); - for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsCollection) - { - for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) - { - if (includeRepaired || !sstable.isRepaired()) - sstables.add(sstable); - } - } - - logger.debug("ViewFilter for {}/{} sstables", sstables.size(), getSSTables().size()); - return ImmutableList.copyOf(sstables); - } - }; - } - + // WARNING: this returns the set of LIVE sstables only, which may be only partially written public List<String> getSSTablesForKey(String key) { DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key)); try (OpOrder.Group op = readOrdering.start()) { List<String> files = new ArrayList<>(); - for (SSTableReader sstr : select(viewFilter(dk)).sstables) + for (SSTableReader sstr : select(View.select(SSTableSet.LIVE, dk)).sstables) { // check if the key actually exists in this sstable, without updating cache and stats if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) != null) @@ -1602,13 +1519,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (ColumnFamilyStore cfs : concatWithIndexes()) { final JSONArray filesJSONArr = new JSONArray(); - try (RefViewFragment currentView = cfs.selectAndReference(CANONICAL_SSTABLES)) + try (RefViewFragment currentView = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (x) -> predicate == null || predicate.apply(x)))) { for (SSTableReader ssTable : currentView.sstables) { - if (predicate != null && !predicate.apply(ssTable)) - continue; - File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName); ssTable.createLinks(snapshotDirectory.getPath()); // hard links filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA)); @@ -1681,7 +1595,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException { Map<Integer, SSTableReader> active = new HashMap<>(); - for (SSTableReader sstable : data.getView().sstables) + for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) active.put(sstable.descriptor.generation, sstable); Map<Descriptor, Set<Component>> snapshots = directories.sstableLister().snapshots(tag).list(); Refs<SSTableReader> refs = new Refs<>(); @@ -1851,7 +1765,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public Iterable<DecoratedKey> keySamples(Range<Token> range) { - try (RefViewFragment view = selectAndReference(CANONICAL_SSTABLES)) + try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL))) { Iterable<DecoratedKey>[] samples = new Iterable[view.sstables.size()]; int i = 0; @@ -1865,7 +1779,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public long estimatedKeysForRange(Range<Token> range) { - try (RefViewFragment view = selectAndReference(CANONICAL_SSTABLES)) + try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL))) { long count = 0; for (SSTableReader sstable : view.sstables) @@ -2014,7 +1928,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public LifecycleTransaction call() throws Exception { assert data.getCompacting().isEmpty() : data.getCompacting(); - Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables())); + Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables(SSTableSet.LIVE))); LifecycleTransaction modifier = data.tryModify(sstables, operationType); assert modifier != null: "something marked things compacting while compactions are disabled"; return modifier; @@ -2126,7 +2040,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { long sum = 0; long count = 0; - for (SSTableReader sstable : getSSTables()) + for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) { long n = sstable.getEstimatedColumnCount().count(); sum += sstable.getEstimatedColumnCount().mean() * n; @@ -2138,7 +2052,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public long estimateKeys() { long n = 0; - for (SSTableReader sstable : getSSTables()) + for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) n += sstable.estimatedKeys(); return n; } @@ -2206,8 +2120,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public boolean isEmpty() { - View view = data.getView(); - return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size() == 0; + return data.getView().isEmpty(); } public boolean isRowCacheEnabled() @@ -2231,7 +2144,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean List<SSTableReader> truncatedSSTables = new ArrayList<>(); - for (SSTableReader sstable : getSSTables()) + for (SSTableReader sstable : getSSTables(SSTableSet.LIVE)) { if (!sstable.newSince(truncatedAt)) truncatedSSTables.add(sstable); @@ -2250,7 +2163,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean long allColumns = 0; int localTime = (int)(System.currentTimeMillis()/1000); - for (SSTableReader sstable : getSSTables()) + for (SSTableReader sstable : getSSTables(SSTableSet.LIVE)) { allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds()); allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count(); @@ -2269,35 +2182,4 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean fileIndexGenerator.set(0); } - // returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially - // visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned - // (even if it completely replaces it) - public static final Function<View, List<SSTableReader>> CANONICAL_SSTABLES = new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) - { - List<SSTableReader> sstables = new ArrayList<>(); - for (SSTableReader sstable : view.compacting) - if (sstable.openReason != SSTableReader.OpenReason.EARLY) - sstables.add(sstable); - for (SSTableReader sstable : view.sstables) - if (!view.compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY) - sstables.add(sstable); - return sstables; - } - }; - - public static final Function<View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) - { - List<SSTableReader> sstables = new ArrayList<>(); - for (SSTableReader sstable : CANONICAL_SSTABLES.apply(view)) - { - if (!sstable.isRepaired()) - sstables.add(sstable); - } - return sstables; - } - }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 07f3e6f..f37ce66 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -38,6 +38,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -272,11 +273,11 @@ public class Keyspace /** * @return A list of open SSTableReaders */ - public List<SSTableReader> getAllSSTables() + public List<SSTableReader> getAllSSTables(SSTableSet sstableSet) { List<SSTableReader> list = new ArrayList<>(columnFamilyStores.size()); for (ColumnFamilyStore cfStore : columnFamilyStores.values()) - list.addAll(cfStore.getSSTables()); + Iterables.addAll(list, cfStore.getSSTables(sstableSet)); return list; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 4a3704f..d48fca5 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -25,6 +25,8 @@ import com.google.common.collect.Iterables; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.*; @@ -157,7 +159,7 @@ public class PartitionRangeReadCommand extends ReadCommand protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) { - ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(dataRange().keyRange())); + ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, dataRange().keyRange())); Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator())); // fetch data from current memtable, historical memtables, and SSTables in the correct order. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java index 53ead14..5ffbd55 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -25,6 +25,8 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.*; @@ -73,7 +75,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap) { Tracing.trace("Acquiring sstable references"); - ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(partitionKey())); + ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); ArrayBackedPartition result = null; ClusteringIndexNamesFilter filter = clusteringIndexFilter(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java index d74dc4e..b4cbbd6 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java @@ -23,6 +23,8 @@ import com.google.common.collect.Iterables; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.Partition; @@ -103,7 +105,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap) { Tracing.trace("Acquiring sstable references"); - ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(partitionKey())); + ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); ClusteringIndexSliceFilter filter = clusteringIndexFilter(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java index 2b03c08..d281c56 100644 --- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java +++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -97,7 +99,7 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable { while (refs == null) { - ColumnFamilyStore.ViewFragment view = table.select(table.viewFilter(Range.makeRowRange(range))); + ColumnFamilyStore.ViewFragment view = table.select(View.select(SSTableSet.CANONICAL, Range.makeRowRange(range))); refs = Refs.tryRef(view.sstables); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 6a4a847..7bfd552 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -808,7 +808,7 @@ public final class SystemKeyspace if (result.isEmpty() || !result.one().has("cluster_name")) { // this is a brand new node - if (!cfs.getSSTables().isEmpty()) + if (!cfs.getLiveSSTables().isEmpty()) throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!"); // no system files. this is a new node. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index e22e6e3..1e12ed6 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -45,6 +45,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.ICompressor; @@ -105,7 +106,7 @@ public class CommitLogReplayer // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call // below: gRP will return NONE if there are no flushed sstables, which is important to have in the // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct). - ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); + ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables(SSTableSet.CANONICAL)); // but, if we've truncated the cf in question, then we need to need to start replay after the truncation ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index d8499ea..6598286 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -24,6 +24,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -370,7 +373,7 @@ public abstract class AbstractCompactionStrategy if (uncheckedTombstoneCompaction) return true; - Collection<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable)); + Collection<SSTableReader> overlaps = cfs.getOverlappingSSTables(SSTableSet.CANONICAL, Collections.singleton(sstable)); if (overlaps.isEmpty()) { // there is no overlap, tombstones are safely droppable http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index df3bc4e..1e91dca 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -19,6 +19,8 @@ package org.apache.cassandra.db.compaction; import java.util.*; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +85,7 @@ public class CompactionController implements AutoCloseable if (compacting == null) overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList()); else - overlappingSSTables = cfs.getAndReferenceOverlappingSSTables(compacting); + overlappingSSTables = cfs.getAndReferenceOverlappingSSTables(SSTableSet.LIVE, compacting); this.overlapIterator = new OverlapIterator<>(buildIntervals(overlappingSSTables)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 616c310..bf412d8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -59,6 +59,8 @@ import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.SecondaryIndexBuilder; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -484,7 +486,7 @@ public class CompactionManager implements CompactionManagerMBean LifecycleTransaction txn, long repairedAt) throws InterruptedException, IOException { - logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size()); + logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getLiveSSTables()); logger.debug("Starting anticompaction for ranges {}", ranges); Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); @@ -647,7 +649,7 @@ public class CompactionManager implements CompactionManagerMBean // This is not efficient, do not use in any critical path private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, Descriptor descriptor) { - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (sstable.descriptor.equals(descriptor)) return sstable; @@ -1039,7 +1041,7 @@ public class CompactionManager implements CompactionManagerMBean // flush first so everyone is validating data that is as similar as possible StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); - ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES); + ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())); Set<SSTableReader> sstablesToValidate = new HashSet<>(); for (SSTableReader sstable : sstableCandidates.sstables) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index cfe28e8..8ec7071 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -33,6 +33,8 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -138,7 +140,7 @@ public class CompactionStrategyManager implements INotificationConsumer private void startup() { - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (sstable.openReason != SSTableReader.OpenReason.EARLY) getCompactionStrategyFor(sstable).addSSTable(sstable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 43f998a..30d38a1 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -22,6 +22,9 @@ import java.util.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; + +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +35,8 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.Pair; +import static com.google.common.collect.Iterables.filter; + public class DateTieredCompactionStrategy extends AbstractCompactionStrategy { private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class); @@ -82,13 +87,13 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy */ private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { - if (!isEnabled() || cfs.getSSTables().isEmpty()) + if (!isEnabled() || Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE))) return Collections.emptyList(); - Set<SSTableReader> uncompacting = Sets.intersection(sstables, cfs.getUncompactingSSTables()); + Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains)); // Find fully expired SSTables. Those will be included no matter what. - Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(uncompacting), gcBefore); + Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore); Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting)); List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore)); @@ -148,13 +153,11 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy */ private long getNow() { - return Collections.max(cfs.getSSTables(), new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()); - } - }).getMaxTimestamp(); + // no need to convert to collection if had an Iterables.max(), but not present in standard toolkit, and not worth adding + List<SSTableReader> list = new ArrayList<>(); + Iterables.addAll(list, cfs.getSSTables(SSTableSet.LIVE)); + return Collections.max(list, (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp())) + .getMaxTimestamp(); } /** @@ -170,7 +173,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy if (maxSSTableAge == 0) return sstables; final long cutoff = now - maxSSTableAge; - return Iterables.filter(sstables, new Predicate<SSTableReader>() + return filter(sstables, new Predicate<SSTableReader>() { @Override public boolean apply(SSTableReader sstable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 0ece341..c64e119 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -36,6 +36,8 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.Pair; +import static com.google.common.collect.Iterables.filter; + public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy { private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class); @@ -82,7 +84,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy int minThreshold = cfs.getMinimumCompactionThreshold(); int maxThreshold = cfs.getMaximumCompactionThreshold(); - Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables)); + Iterable<SSTableReader> candidates = filterSuspectSSTables(filter(cfs.getUncompactingSSTables(), sstables::contains)); List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize); logger.debug("Compaction buckets are {}", buckets); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/index/SecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java index 7552fd5..94031ab 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java @@ -39,6 +39,8 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.index.composites.CompositesIndex; import org.apache.cassandra.db.index.keys.KeysIndex; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.LocalByPartionerType; @@ -204,9 +206,9 @@ public abstract class SecondaryIndex protected void buildIndexBlocking() { logger.info(String.format("Submitting index build of %s for data in %s", - getIndexName(), StringUtils.join(baseCfs.getSSTables(), ", "))); + getIndexName(), StringUtils.join(baseCfs.getSSTables(SSTableSet.CANONICAL), ", "))); - try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(ColumnFamilyStore.CANONICAL_SSTABLES).refs) + try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)).refs) { SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, Collections.singleton(getIndexName()), http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index b743633..edfd795 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -414,7 +414,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional private List<SSTableReader> restoreUpdatedOriginals() { Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete); - return ImmutableList.copyOf(transform(torestore, (reader) -> current(reader).cloneWithNewStart(reader.first, null))); + return ImmutableList.copyOf(transform(torestore, (reader) -> current(reader).cloneWithRestoredStart(reader.first))); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java b/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java new file mode 100644 index 0000000..6cc26d6 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java @@ -0,0 +1,12 @@ +package org.apache.cassandra.db.lifecycle; + +public enum SSTableSet +{ + // returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially + // visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned + // (even if it completely replaces it) + CANONICAL, + // returns the live versions of all sstables, i.e. including partially written sstables + LIVE, + NONCOMPACTING +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 241eb4b..f5829ea 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -201,7 +201,7 @@ public class Tracker !isDummy() ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(), ImmutableList.<Memtable>of(), Collections.<SSTableReader, SSTableReader>emptyMap(), - Collections.<SSTableReader>emptySet(), + Collections.<SSTableReader, SSTableReader>emptyMap(), SSTableIntervalTree.empty())); } @@ -362,19 +362,14 @@ public class Tracker // MISCELLANEOUS public utility calls - public Set<SSTableReader> getSSTables() - { - return view.get().sstables; - } - public Set<SSTableReader> getCompacting() { return view.get().compacting; } - public Set<SSTableReader> getUncompacting() + public Iterable<SSTableReader> getUncompacting() { - return view.get().nonCompactingSStables(); + return view.get().sstables(SSTableSet.NONCOMPACTING); } public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index f710dda..324dbc1 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -24,6 +24,7 @@ import com.google.common.base.Functions; import com.google.common.base.Predicate; import com.google.common.collect.*; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; @@ -37,6 +38,7 @@ import static com.google.common.collect.ImmutableList.of; import static com.google.common.collect.Iterables.all; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.transform; import static java.util.Collections.singleton; import static org.apache.cassandra.db.lifecycle.Helpers.emptySet; import static org.apache.cassandra.db.lifecycle.Helpers.replace; @@ -64,17 +66,18 @@ public class View * flushed. In chronologically ascending order. */ public final List<Memtable> flushingMemtables; - public final Set<SSTableReader> compacting; - public final Set<SSTableReader> sstables; + final Set<SSTableReader> compacting; + final Set<SSTableReader> sstables; // we use a Map here so that we can easily perform identity checks as well as equality checks. // When marking compacting, we now indicate if we expect the sstables to be present (by default we do), // and we then check that not only are they all present in the live set, but that the exact instance present is // the one we made our decision to compact against. - public final Map<SSTableReader, SSTableReader> sstablesMap; + final Map<SSTableReader, SSTableReader> sstablesMap; + final Map<SSTableReader, SSTableReader> compactingMap; - public final SSTableIntervalTree intervalTree; + final SSTableIntervalTree intervalTree; - View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree) + View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree) { assert liveMemtables != null; assert flushingMemtables != null; @@ -87,7 +90,8 @@ public class View this.sstablesMap = sstables; this.sstables = sstablesMap.keySet(); - this.compacting = compacting; + this.compactingMap = compacting; + this.compacting = compactingMap.keySet(); this.intervalTree = intervalTree; } @@ -104,9 +108,37 @@ public class View return concat(flushingMemtables, liveMemtables); } - public Sets.SetView<SSTableReader> nonCompactingSStables() + // shortcut for all live sstables, so can efficiently use it for size, etc + public Set<SSTableReader> liveSSTables() { - return Sets.difference(sstables, compacting); + return sstables; + } + + public Iterable<SSTableReader> sstables(SSTableSet sstableSet) + { + return select(sstableSet, sstables); + } + + public Iterable<SSTableReader> sstables(SSTableSet sstableSet, Predicate<SSTableReader> filter) + { + return select(sstableSet, filter(sstables, filter)); + } + + private Iterable<SSTableReader> select(SSTableSet sstableSet, Iterable<SSTableReader> sstables) + { + switch (sstableSet) + { + case LIVE: + return sstables; + case NONCOMPACTING: + return filter(sstables, (s) -> !compacting.contains(s)); + case CANONICAL: + return transform(filter(sstables, + (s) -> s.openReason != SSTableReader.OpenReason.EARLY), + (s) -> s.openReason != SSTableReader.OpenReason.MOVED_START ? s : compactingMap.get(s)); + default: + throw new IllegalStateException(); + } } public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates) @@ -120,18 +152,55 @@ public class View }); } + public boolean isEmpty() + { + return sstables.isEmpty() + && liveMemtables.size() <= 1 + && flushingMemtables.size() == 0 + && (liveMemtables.size() == 0 || liveMemtables.get(0).getOperations() == 0); + } + @Override public String toString() { return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting); } - public List<SSTableReader> sstablesInBounds(AbstractBounds<PartitionPosition> rowBounds) + public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds) { if (intervalTree.isEmpty()) return Collections.emptyList(); PartitionPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right; - return intervalTree.search(Interval.<PartitionPosition, SSTableReader>create(rowBounds.left, stopInTree)); + return select(sstableSet, intervalTree.search(Interval.create(rowBounds.left, stopInTree))); + } + + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet) + { + return (view) -> view.sstables(sstableSet); + } + + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter) + { + return (view) -> view.sstables(sstableSet, filter); + } + + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for the given @param key, according to the interval tree + */ + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key) + { + assert sstableSet == SSTableSet.LIVE; + return (view) -> view.intervalTree.search(key); + } + + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for rows within @param rowBounds, inclusive, according to the interval tree. + */ + public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds) + { + return (view) -> view.sstablesInBounds(sstableSet, rowBounds); } // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW: @@ -147,7 +216,7 @@ public class View { assert all(mark, Helpers.idIn(view.sstablesMap)); return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, - replace(view.compacting, unmark, mark), + replace(view.compactingMap, unmark, mark), view.intervalTree); } }; @@ -179,7 +248,7 @@ public class View public View apply(View view) { Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add); - return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting, + return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, SSTableIntervalTree.build(sstableMap.keySet())); } }; @@ -194,7 +263,7 @@ public class View { List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build(); assert newLive.size() == view.liveMemtables.size() + 1; - return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.intervalTree); + return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree); } }; } @@ -213,7 +282,7 @@ public class View filter(flushing, not(lessThan(toFlush))))); assert newLive.size() == live.size() - 1; assert newFlushing.size() == flushing.size() + 1; - return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.intervalTree); + return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree); } }; } @@ -230,10 +299,10 @@ public class View if (flushed == null) return new View(view.liveMemtables, flushingMemtables, view.sstablesMap, - view.compacting, view.intervalTree); + view.compactingMap, view.intervalTree); Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed)); - return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compacting, + return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap, SSTableIntervalTree.build(sstableMap.keySet())); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index e6558eb..9b6ab6b 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -28,6 +28,7 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -192,7 +193,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean for (Keyspace ks : Keyspace.all()) { for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores()) - result.addAll(cfStore.getSSTables()); + result.addAll(cfStore.getLiveSSTables()); } return result; @@ -216,7 +217,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean do { View view = cfStore.getTracker().getView(); - allSSTables = view.sstables; + allSSTables = ImmutableSet.copyOf(view.sstables(SSTableSet.CANONICAL)); nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables)); } while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 6a09d5a..a8aedc7 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; @@ -218,12 +219,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * @param sstables SSTables to calculate key count * @return estimated key count */ - public static long getApproximateKeyCount(Collection<SSTableReader> sstables) + public static long getApproximateKeyCount(Iterable<SSTableReader> sstables) { long count = -1; // check if cardinality estimator is available for all SSTables - boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>() + boolean cardinalityAvailable = !Iterables.isEmpty(sstables) && Iterables.all(sstables, new Predicate<SSTableReader>() { public boolean apply(SSTableReader sstable) { @@ -1071,6 +1072,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return replacement; } + public SSTableReader cloneWithRestoredStart(DecoratedKey restoredStart) + { + synchronized (tidy.global) + { + return cloneAndReplace(restoredStart, OpenReason.NORMAL); + } + } + // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain a reference chain to this reader public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 1b4293f..124b8ca 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -28,6 +28,8 @@ import com.codahale.metrics.Timer; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.EstimatedHistogram; @@ -278,7 +280,7 @@ public class TableMetrics { public long[] getValue() { - return combineHistograms(cfs.getSSTables(), new GetHistogram() + return combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL), new GetHistogram() { public EstimatedHistogram getHistogram(SSTableReader reader) { @@ -296,7 +298,7 @@ public class TableMetrics long memtablePartitions = 0; for (Memtable memtable : cfs.getTracker().getView().getAllMemtables()) memtablePartitions += memtable.partitionCount(); - return SSTableReader.getApproximateKeyCount(cfs.getSSTables()) + memtablePartitions; + return SSTableReader.getApproximateKeyCount(cfs.getSSTables(SSTableSet.CANONICAL)) + memtablePartitions; } }); estimatedColumnCountHistogram = Metrics.register(factory.createMetricName("EstimatedColumnCountHistogram"), @@ -305,7 +307,7 @@ public class TableMetrics { public long[] getValue() { - return combineHistograms(cfs.getSSTables(), new GetHistogram() + return combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL), new GetHistogram() { public EstimatedHistogram getHistogram(SSTableReader reader) { @@ -321,7 +323,7 @@ public class TableMetrics { double sum = 0; int total = 0; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (sstable.getCompressionRatio() != MetadataCollector.NO_COMPRESSION_RATIO) { @@ -339,7 +341,7 @@ public class TableMetrics int total = 0; for (Keyspace keyspace : Keyspace.all()) { - for (SSTableReader sstable : keyspace.getAllSSTables()) + for (SSTableReader sstable : keyspace.getAllSSTables(SSTableSet.CANONICAL)) { if (sstable.getCompressionRatio() != MetadataCollector.NO_COMPRESSION_RATIO) { @@ -366,7 +368,7 @@ public class TableMetrics { public Integer getValue() { - return cfs.getTracker().getSSTables().size(); + return cfs.getTracker().getView().liveSSTables().size(); } }); liveDiskSpaceUsed = createTableCounter("LiveDiskSpaceUsed"); @@ -376,7 +378,7 @@ public class TableMetrics public Long getValue() { long min = 0; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (min == 0 || sstable.getEstimatedPartitionSize().min() < min) min = sstable.getEstimatedPartitionSize().min(); @@ -400,7 +402,7 @@ public class TableMetrics public Long getValue() { long max = 0; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (sstable.getEstimatedPartitionSize().max() > max) max = sstable.getEstimatedPartitionSize().max(); @@ -425,7 +427,7 @@ public class TableMetrics { long sum = 0; long count = 0; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { long n = sstable.getEstimatedPartitionSize().count(); sum += sstable.getEstimatedPartitionSize().mean() * n; @@ -441,7 +443,7 @@ public class TableMetrics long count = 0; for (Keyspace keyspace : Keyspace.all()) { - for (SSTableReader sstable : keyspace.getAllSSTables()) + for (SSTableReader sstable : keyspace.getAllSSTables(SSTableSet.CANONICAL)) { long n = sstable.getEstimatedPartitionSize().count(); sum += sstable.getEstimatedPartitionSize().mean() * n; @@ -456,7 +458,7 @@ public class TableMetrics public Long getValue() { long count = 0L; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable: cfs.getSSTables(SSTableSet.LIVE)) count += sstable.getBloomFilterFalsePositiveCount(); return count; } @@ -466,7 +468,7 @@ public class TableMetrics public Long getValue() { long count = 0L; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.LIVE)) count += sstable.getRecentBloomFilterFalsePositiveCount(); return count; } @@ -477,7 +479,7 @@ public class TableMetrics { long falseCount = 0L; long trueCount = 0L; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.LIVE)) { falseCount += sstable.getBloomFilterFalsePositiveCount(); trueCount += sstable.getBloomFilterTruePositiveCount(); @@ -494,7 +496,7 @@ public class TableMetrics long trueCount = 0L; for (Keyspace keyspace : Keyspace.all()) { - for (SSTableReader sstable : keyspace.getAllSSTables()) + for (SSTableReader sstable : keyspace.getAllSSTables(SSTableSet.LIVE)) { falseCount += sstable.getBloomFilterFalsePositiveCount(); trueCount += sstable.getBloomFilterTruePositiveCount(); @@ -511,7 +513,7 @@ public class TableMetrics { long falseCount = 0L; long trueCount = 0L; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable: cfs.getSSTables(SSTableSet.LIVE)) { falseCount += sstable.getRecentBloomFilterFalsePositiveCount(); trueCount += sstable.getRecentBloomFilterTruePositiveCount(); @@ -528,7 +530,7 @@ public class TableMetrics long trueCount = 0L; for (Keyspace keyspace : Keyspace.all()) { - for (SSTableReader sstable : keyspace.getAllSSTables()) + for (SSTableReader sstable : keyspace.getAllSSTables(SSTableSet.LIVE)) { falseCount += sstable.getRecentBloomFilterFalsePositiveCount(); trueCount += sstable.getRecentBloomFilterTruePositiveCount(); @@ -544,7 +546,7 @@ public class TableMetrics public Long getValue() { long total = 0; - for (SSTableReader sst : cfs.getSSTables()) + for (SSTableReader sst : cfs.getSSTables(SSTableSet.CANONICAL)) total += sst.getBloomFilterSerializedSize(); return total; } @@ -554,7 +556,7 @@ public class TableMetrics public Long getValue() { long total = 0; - for (SSTableReader sst : cfs.getSSTables()) + for (SSTableReader sst : cfs.getSSTables(SSTableSet.LIVE)) total += sst.getBloomFilterOffHeapSize(); return total; } @@ -564,7 +566,7 @@ public class TableMetrics public Long getValue() { long total = 0; - for (SSTableReader sst : cfs.getSSTables()) + for (SSTableReader sst : cfs.getSSTables(SSTableSet.LIVE)) total += sst.getIndexSummaryOffHeapSize(); return total; } @@ -574,7 +576,7 @@ public class TableMetrics public Long getValue() { long total = 0; - for (SSTableReader sst : cfs.getSSTables()) + for (SSTableReader sst : cfs.getSSTables(SSTableSet.LIVE)) total += sst.getCompressionMetadataOffHeapSize(); return total; } @@ -593,7 +595,7 @@ public class TableMetrics protected double getNumerator() { long hits = 0L; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.LIVE)) hits += sstable.getKeyCacheHit(); return hits; } @@ -601,7 +603,7 @@ public class TableMetrics protected double getDenominator() { long requests = 0L; - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.LIVE)) requests += sstable.getKeyCacheRequest(); return Math.max(requests, 1); // to avoid NaN. } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 6c25793..b1554e3 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -34,6 +34,8 @@ import javax.management.ObjectName; import com.google.common.util.concurrent.Futures; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -465,7 +467,7 @@ public class CacheService implements CacheServiceMBean } ByteBuffer key = ByteBufferUtil.read(input, keyLength); int generation = input.readInt(); - SSTableReader reader = findDesc(generation, cfs.getSSTables()); + SSTableReader reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL)); input.readBoolean(); // backwards compatibility for "promoted indexes" boolean if (reader == null) { @@ -479,7 +481,7 @@ public class CacheService implements CacheServiceMBean return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry)); } - private SSTableReader findDesc(int generation, Collection<SSTableReader> collection) + private SSTableReader findDesc(int generation, Iterable<SSTableReader> collection) { for (SSTableReader sstable : collection) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 744a03a..b0acd3a 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -24,11 +24,10 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.base.Function; import com.google.common.collect.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -324,31 +323,20 @@ public class StreamSession implements IEndpointStateChangeSubscriber final List<AbstractBounds<PartitionPosition>> rowBoundsList = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) rowBoundsList.add(Range.makeRowRange(range)); - refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>() - { - public List<SSTableReader> apply(View view) + refs.addAll(cfStore.selectAndReference(view -> { + Set<SSTableReader> sstables = Sets.newHashSet(); + for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsList) { - Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>(); - for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view)) - permittedInstances.put(reader, reader); - - Set<SSTableReader> sstables = Sets.newHashSet(); - for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsList) + for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, rowBounds)) { - // sstableInBounds may contain early opened sstables - for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) - { - if (isIncremental && sstable.isRepaired()) - continue; - sstable = permittedInstances.get(sstable); - if (sstable != null) - sstables.add(sstable); - } + if (!isIncremental || !sstable.isRepaired()) + sstables.add(sstable); } - - logger.debug("ViewFilter for {}/{} sstables", sstables.size(), view.sstables.size()); - return ImmutableList.copyOf(sstables); } + + if (logger.isDebugEnabled()) + logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL))); + return sstables; }).refs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java index 56a7e86..9383410 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java @@ -195,7 +195,7 @@ public class LongCompactionsTest FBUtilities.waitOnFutures(compactions); } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0); - if (cfs.getSSTables().size() > 1) + if (cfs.getLiveSSTables().size() > 1) { CompactionManager.instance.performMaximal(cfs, false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index c828de9..e97af68 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -214,7 +214,7 @@ public class Util public static Future<?> compactAll(ColumnFamilyStore cfs, int gcBefore) { List<Descriptor> descriptors = new ArrayList<>(); - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getLiveSSTables()) descriptors.add(sstable.descriptor); return CompactionManager.instance.submitUserDefined(cfs, descriptors, gcBefore); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad8cad7c/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java index 71b87f9..c4157ea 100644 --- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java +++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java @@ -63,10 +63,10 @@ public class AutoSavingCacheTest cfs.forceBlockingFlush(); } - Assert.assertEquals(2, cfs.getSSTables().size()); + Assert.assertEquals(2, cfs.getLiveSSTables().size()); // preheat key cache - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getLiveSSTables()) sstable.getPosition(Util.dk("key1"), SSTableReader.Operator.EQ); AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; @@ -80,7 +80,7 @@ public class AutoSavingCacheTest // then load saved keyCache.loadSaved(cfs); Assert.assertEquals(2, keyCache.size()); - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getLiveSSTables()) Assert.assertNotNull(keyCache.get(new KeyCacheKey(cfs.metadata.cfId, sstable.descriptor, ByteBufferUtil.bytes("key1")))); } }
