Plug holes in resource release when wiring up StreamSession patch by belliotsmith; reviewed by yukim for CASSANDRA-7073
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24e71dbf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24e71dbf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24e71dbf Branch: refs/heads/trunk Commit: 24e71dbff4e08878c94ad8cadaf9c5c6de8ae658 Parents: 0ad5e36 Author: belliottsmith <[email protected]> Authored: Tue Mar 25 10:09:45 2014 +0000 Committer: Yuki Morishita <[email protected]> Committed: Fri Apr 25 19:45:03 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableLoader.java | 23 ++++-- .../apache/cassandra/streaming/StreamPlan.java | 3 +- .../cassandra/streaming/StreamSession.java | 77 +++++++++++--------- 4 files changed, 63 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c1c03ea..5baaefd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -113,6 +113,7 @@ Merged from 2.0: * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949) * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058) * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047) + * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073) Merged from 1.2: * Fix nodetool display with vnodes (CASSANDRA-7082) * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 7b9d135..b14e203 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -168,13 +168,26 @@ public class SSTableLoader implements StreamEventHandler if (toIgnore.contains(remote)) continue; - Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote); + List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>(); - // transferSSTables assumes references have been acquired - for (StreamSession.SSTableStreamingSections details : endpointDetails) - details.sstable.acquireReference(); + try + { + // transferSSTables assumes references have been acquired + for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote)) + { + if (!details.sstable.acquireReference()) + throw new IllegalStateException(); + + endpointDetails.add(details); + } - plan.transferFiles(remote, streamingDetails.get(remote)); + plan.transferFiles(remote, endpointDetails); + } + finally + { + for (StreamSession.SSTableStreamingSections details : endpointDetails) + details.sstable.releaseReference(); + } } plan.listeners(this, listeners); return plan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index f9d1ae5..04bd7df 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -121,7 +121,8 @@ public class StreamPlan * Add transfer task to send given SSTable files. * * @param to endpoint address of receiver - * @param sstableDetails sstables with file positions and estimated key count + * @param sstableDetails sstables with file positions and estimated key count. + * this collection will be modified to remove those files that are successfully handed off * @return this object for chaining */ public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e71dbf/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 1ef24e3..c5f4cf9 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -241,8 +241,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe flushSSTables(stores); List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<SSTableReader> sstables = getSSTablesForRanges(normalizedRanges, stores); - addTransferFiles(normalizedRanges, sstables, repairedAt); + List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt); + try + { + addTransferFiles(sections); + } + finally + { + for (SSTableStreamingSections release : sections) + release.sstable.releaseReference(); + } } private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies) @@ -261,53 +269,51 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe return stores; } - private List<SSTableReader> getSSTablesForRanges(Collection<Range<Token>> normalizedRanges, Collection<ColumnFamilyStore> stores) + private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt) { - List<SSTableReader> sstables = Lists.newLinkedList(); - for (ColumnFamilyStore cfStore : stores) + List<SSTableReader> sstables = new ArrayList<>(); + try { - List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList(); - for (Range<Token> range : normalizedRanges) - rowBoundsList.add(range.toRowBounds()); - ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)); - sstables.addAll(view.sstables); - } - return sstables; - } + for (ColumnFamilyStore cfStore : stores) + { + List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); + for (Range<Token> range : ranges) + rowBoundsList.add(range.toRowBounds()); + ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)); + sstables.addAll(view.sstables); + } - /** - * Set up transfer of the specific SSTables. - * {@code sstables} must be marked as referenced so that not get deleted until transfer completes. - * - * @param ranges Transfer ranges - * @param sstables Transfer files - * @param overriddenRepairedAt use this repairedAt time, for use in repair. - */ - public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables, long overriddenRepairedAt) - { - List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size()); - for (SSTableReader sstable : sstables) + List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size()); + for (SSTableReader sstable : sstables) + { + long repairedAt = overriddenRepairedAt; + if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + repairedAt = sstable.getSSTableMetadata().repairedAt; + sections.add(new SSTableStreamingSections(sstable, + sstable.getPositionsForRanges(ranges), + sstable.estimatedKeysForRanges(ranges), + repairedAt)); + } + return sections; + } + catch (Throwable t) { - long repairedAt = overriddenRepairedAt; - if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) - repairedAt = sstable.getSSTableMetadata().repairedAt; - sstableDetails.add(new SSTableStreamingSections(sstable, - sstable.getPositionsForRanges(ranges), - sstable.estimatedKeysForRanges(ranges), - repairedAt)); + SSTableReader.releaseReferences(sstables); + throw t; } - - addTransferFiles(sstableDetails); } public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails) { - for (SSTableStreamingSections details : sstableDetails) + Iterator<SSTableStreamingSections> iter = sstableDetails.iterator(); + while (iter.hasNext()) { + SSTableStreamingSections details = iter.next(); if (details.sections.isEmpty()) { // A reference was acquired on the sstable and we won't stream it details.sstable.releaseReference(); + iter.remove(); continue; } @@ -319,6 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe transfers.put(cfId, task); } task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt); + iter.remove(); } }
