Repository: cassandra Updated Branches: refs/heads/trunk 902925716 -> c7d604bdf
Use OpOrder to guard sstable references for reads. Patch by benedict; reviewed by marcuse for CASSANDRA-6919 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13910dc4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13910dc4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13910dc4 Branch: refs/heads/trunk Commit: 13910dc40077d5d0dadb541c043047f1b7a37be2 Parents: ad57cb0 Author: belliottsmith <[email protected]> Authored: Tue Mar 25 10:09:45 2014 +0000 Committer: Marcus Eriksson <[email protected]> Committed: Wed Apr 23 14:16:04 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../org/apache/cassandra/config/Schema.java | 11 ++ .../cassandra/db/CollationController.java | 6 +- .../apache/cassandra/db/ColumnFamilyStore.java | 117 ++++++++----------- .../cassandra/io/sstable/SSTableReader.java | 79 +++++++++---- .../cassandra/streaming/StreamSession.java | 2 +- 6 files changed, 119 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 844df95..07fc3f9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -48,7 +48,8 @@ * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924) * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024) * Require nodetool rebuild_index to specify index names (CASSANDRA-7038) - * fix cassandra stress errors on reads with native protocol (CASANDRA-7033) + * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033) + * Use OpOrder to guard sstable references for reads (CASSANDRA-6919) Merged from 2.0: * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939) * Log a warning for large batches (CASSANDRA-6487) http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index c606388..b1e0f2f 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -129,6 +129,17 @@ public class Schema return keyspaceInstances.get(keyspaceName); } + public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId) + { + Pair<String, String> pair = cfIdMap.inverse().get(cfId); + if (pair == null) + return null; + Keyspace instance = getKeyspaceInstance(pair.left); + if (instance == null) + return null; + return instance.getColumnFamilyStore(cfId); + } + /** * Store given Keyspace instance to the schema * http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 151a7c5..36a9ebf 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -69,7 +69,7 @@ public class CollationController final ColumnFamily container = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed()); List<OnDiskAtomIterator> iterators = new ArrayList<>(); Tracing.trace("Acquiring sstable references"); - ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); + ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key)); try { @@ -159,7 +159,6 @@ public class CollationController { for (OnDiskAtomIterator iter : iterators) FileUtils.closeQuietly(iter); - SSTableReader.releaseReferences(view.sstables); } } @@ -187,7 +186,7 @@ public class CollationController private ColumnFamily collectAllData(boolean copyOnHeap) { Tracing.trace("Acquiring sstable references"); - ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); + ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key)); List<OnDiskAtomIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed()); DeletionInfo returnDeletionInfo = returnCF.deletionInfo(); @@ -311,7 +310,6 @@ public class CollationController { for (OnDiskAtomIterator iter : iterators) FileUtils.closeQuietly(iter); - SSTableReader.releaseReferences(view.sstables); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 8f96765..ea49250 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1780,68 +1780,64 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return repairedSSTables; } - private ViewFragment markReferenced(Function<DataTracker.View, List<SSTableReader>> filter) + public ViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>> filter) { - List<SSTableReader> sstables; - DataTracker.View view; - while (true) { - view = data.getView(); - - if (view.intervalTree.isEmpty()) - { - sstables = Collections.emptyList(); - break; - } - - sstables = filter.apply(view); - if (SSTableReader.acquireReferences(sstables)) - break; - // retry w/ new view + ViewFragment view = select(filter); + if (view.sstables.isEmpty() || SSTableReader.acquireReferences(view.sstables)) + return view; } + } + public ViewFragment select(Function<DataTracker.View, List<SSTableReader>> filter) + { + DataTracker.View view = data.getView(); + List<SSTableReader> sstables = view.intervalTree.isEmpty() + ? Collections.<SSTableReader>emptyList() + : filter.apply(view); return new ViewFragment(sstables, view.getAllMemtables()); } + /** * @return a ViewFragment containing the sstables and memtables that may need to be merged * for the given @param key, according to the interval tree */ - public ViewFragment markReferenced(final DecoratedKey key) + public Function<DataTracker.View, List<SSTableReader>> viewFilter(final DecoratedKey key) { assert !key.isMinimum(partitioner); - return markReferenced(new Function<DataTracker.View, List<SSTableReader>>() + return new Function<DataTracker.View, List<SSTableReader>>() { public List<SSTableReader> apply(DataTracker.View view) { return compactionStrategy.filterSSTablesForReads(view.intervalTree.search(key)); } - }); + }; } /** * @return a ViewFragment containing the sstables and memtables that may need to be merged * for rows within @param rowBounds, inclusive, according to the interval tree. */ - public ViewFragment markReferenced(final AbstractBounds<RowPosition> rowBounds) + public Function<DataTracker.View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds) { - return markReferenced(new Function<DataTracker.View, List<SSTableReader>>() + return new Function<DataTracker.View, List<SSTableReader>>() { public List<SSTableReader> apply(DataTracker.View view) { return compactionStrategy.filterSSTablesForReads(view.sstablesInBounds(rowBounds)); } - }); + }; } /** * @return a ViewFragment containing the sstables and memtables that may need to be merged * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree. */ - public ViewFragment markReferenced(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection) + public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection) { - return markReferenced(new Function<DataTracker.View, List<SSTableReader>>() + return new Function<DataTracker.View, List<SSTableReader>>() { public List<SSTableReader> apply(DataTracker.View view) { @@ -1851,17 +1847,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return ImmutableList.copyOf(sstables); } - }); + }; } public List<String> getSSTablesForKey(String key) { DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key)); - ViewFragment view = markReferenced(dk); - try + try (OpOrder.Group op = readOrdering.start()) { - List<String> files = new ArrayList<String>(); - for (SSTableReader sstr : view.sstables) + List<String> files = new ArrayList<>(); + for (SSTableReader sstr : select(viewFilter(dk)).sstables) { // check if the key actually exists in this sstable, without updating cache and stats if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) != null) @@ -1869,10 +1864,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } return files; } - finally - { - SSTableReader.releaseReferences(view.sstables); - } } public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore) @@ -1927,51 +1918,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { assert !(range.keyRange() instanceof Range) || !((Range)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum(partitioner) : range.keyRange(); - final ViewFragment view = markReferenced(range.keyRange()); + final ViewFragment view = select(viewFilter(range.keyRange())); Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator())); - try - { - final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now); + final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now); - // todo this could be pushed into SSTableScanner - return new AbstractScanIterator() + // todo this could be pushed into SSTableScanner + return new AbstractScanIterator() + { + protected Row computeNext() { - protected Row computeNext() - { - // pull a row out of the iterator - if (!iterator.hasNext()) - return endOfData(); + // pull a row out of the iterator + if (!iterator.hasNext()) + return endOfData(); - Row current = iterator.next(); - DecoratedKey key = current.key; + Row current = iterator.next(); + DecoratedKey key = current.key; - if (!range.stopKey().isMinimum(partitioner) && range.stopKey().compareTo(key) < 0) - return endOfData(); + if (!range.stopKey().isMinimum(partitioner) && range.stopKey().compareTo(key) < 0) + return endOfData(); - // skipping outside of assigned range - if (!range.contains(key)) - return computeNext(); + // skipping outside of assigned range + if (!range.contains(key)) + return computeNext(); - if (logger.isTraceEnabled()) - logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key)); + if (logger.isTraceEnabled()) + logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key)); - return current; - } + return current; + } - public void close() throws IOException - { - SSTableReader.releaseReferences(view.sstables); - iterator.close(); - } - }; - } - catch (RuntimeException e) - { - // In case getIterator() throws, otherwise the iteror close method releases the references. - SSTableReader.releaseReferences(view.sstables); - throw e; - } + public void close() throws IOException + { + iterator.close(); + } + }; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index e70fd60..8e359bd 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -111,6 +111,7 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.FilterFactory; import org.apache.cassandra.utils.IFilter; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR; @@ -554,13 +555,15 @@ public class SSTableReader extends SSTable synchronized (replaceLock) { - boolean closeBf = true, closeSummary = true, closeFiles = true; + boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFile = false; if (replacedBy != null) { closeBf = replacedBy.bf != bf; closeSummary = replacedBy.indexSummary != indexSummary; closeFiles = replacedBy.dfile != dfile; + // if the replacement sstablereader uses a different path, clean up our paths + deleteFile = !dfile.path.equals(replacedBy.dfile.path); } if (replaces != null) @@ -568,6 +571,7 @@ public class SSTableReader extends SSTable closeBf &= replaces.bf != bf; closeSummary &= replaces.indexSummary != indexSummary; closeFiles &= replaces.dfile != dfile; + deleteFile &= !dfile.path.equals(replaces.dfile.path); } boolean deleteAll = false; @@ -593,32 +597,57 @@ public class SSTableReader extends SSTable replacedBy.replaces = replaces; } - if (references.get() != 0) - { - throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)"); - } - if (closeBf) - bf.close(); - if (closeSummary) - indexSummary.close(); - if (closeFiles) - { - ifile.cleanup(); - dfile.cleanup(); - } - if (deleteAll) + scheduleTidy(closeBf, closeSummary, closeFiles, deleteFile, deleteAll); + } + } + + private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll) + { + final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); + final OpOrder.Barrier barrier; + if (cfs != null) + { + barrier = cfs.readOrdering.newBarrier(); + barrier.issue(); + } + else + barrier = null; + + StorageService.tasks.execute(new Runnable() + { + public void run() { - /** - * Do the OS a favour and suggest (using fadvice call) that we - * don't want to see pages of this SSTable in memory anymore. - * - * NOTE: We can't use madvice in java because it requires the address of - * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it - */ - dropPageCache(); - deletingTask.schedule(); + if (barrier != null) + barrier.await(); + assert references.get() == 0; + if (closeBf) + bf.close(); + if (closeSummary) + indexSummary.close(); + if (closeFiles) + { + ifile.cleanup(); + dfile.cleanup(); + } + if (deleteAll) + { + /** + * Do the OS a favour and suggest (using fadvice call) that we + * don't want to see pages of this SSTable in memory anymore. + * + * NOTE: We can't use madvice in java because it requires the address of + * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it + */ + dropPageCache(); + deletingTask.run(); + } + else if (deleteFiles) + { + FileUtils.deleteWithConfirm(new File(dfile.path)); + FileUtils.deleteWithConfirm(new File(ifile.path)); + } } - } + }); } public String getFilename() http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index e8879f8..1ef24e3 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -269,7 +269,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList(); for (Range<Token> range : normalizedRanges) rowBoundsList.add(range.toRowBounds()); - ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList); + ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)); sstables.addAll(view.sstables); } return sstables;
