Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 e974b6f93 -> 95e09f262 refs/heads/cassandra-2.1 0ad5e3681 -> a6efffe58 refs/heads/trunk 2ba011ed0 -> ca9c4b468
Plug holes in resource release when wiring up StreamSession patch by belliotsmith; reviewed by yukim for CASSANDRA-7073 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95e09f26 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95e09f26 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95e09f26 Branch: refs/heads/cassandra-2.0 Commit: 95e09f262ce8d448e8bbbd17aa9c77f6546d5ed1 Parents: e974b6f Author: belliottsmith <[email protected]> Authored: Fri Apr 25 18:44:15 2014 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri Apr 25 18:50:51 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableLoader.java | 23 +++++-- .../apache/cassandra/streaming/StreamPlan.java | 3 +- .../cassandra/streaming/StreamSession.java | 63 +++++++++++++------- 4 files changed, 62 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cd6443e..376ad87 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949) * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047) * Fix CFMetaData#getColumnDefinitionFromColumnName() (CASSANDRA-7074) + * Plug holes in resource release when wiring up StreamSession (CASSANDRA-7073) Merged from 1.2: * Fix nodetool display with vnodes (CASSANDRA-7082) * Fix schema concurrency exceptions (CASSANDRA-6841) http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 1ea4c55..4a1604d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -167,13 +167,26 @@ public class SSTableLoader implements StreamEventHandler if (toIgnore.contains(remote)) continue; - Collection<StreamSession.SSTableStreamingSections> endpointDetails = streamingDetails.get(remote); + List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>(); - // transferSSTables assumes references have been acquired - for (StreamSession.SSTableStreamingSections details : endpointDetails) - details.sstable.acquireReference(); + try + { + // transferSSTables assumes references have been acquired + for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote)) + { + if (!details.sstable.acquireReference()) + throw new IllegalStateException(); + + endpointDetails.add(details); + } - plan.transferFiles(remote, streamingDetails.get(remote)); + plan.transferFiles(remote, endpointDetails); + } + finally + { + for (StreamSession.SSTableStreamingSections details : endpointDetails) + details.sstable.releaseReference(); + } } plan.listeners(this, listeners); return plan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 740ad66..b57e097 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -112,7 +112,8 @@ public class StreamPlan * Add transfer task to send given SSTable files. * * @param to endpoint address of receiver - * @param sstableDetails sstables with file positions and estimated key count + * @param sstableDetails sstables with file positions and estimated key count. + * this collection will be modified to remove those files that are successfully handed off * @return this object for chaining */ public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) http://git-wip-us.apache.org/repos/asf/cassandra/blob/95e09f26/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 7976a40..0ba41fb 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,42 +247,61 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe flushSSTables(stores); List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<SSTableReader> sstables = Lists.newLinkedList(); - for (ColumnFamilyStore cfStore : stores) + List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores); + try { - List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList(); - for (Range<Token> range : normalizedRanges) - rowBoundsList.add(range.toRowBounds()); - ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList); - sstables.addAll(view.sstables); + addTransferFiles(sections); + } + finally + { + for (SSTableStreamingSections release : sections) + release.sstable.releaseReference(); } - addTransferFiles(normalizedRanges, sstables); } - /** - * Set up transfer of the specific SSTables. - * {@code sstables} must be marked as referenced so that not get deleted until transfer completes. - * - * @param ranges Transfer ranges - * @param sstables Transfer files - */ - public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables) + private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores) { - List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size()); - for (SSTableReader sstable : sstables) - sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges))); + List<SSTableReader> sstables = new ArrayList<>(); + try + { + for (ColumnFamilyStore cfStore : stores) + { + List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); + for (Range<Token> range : ranges) + rowBoundsList.add(range.toRowBounds()); + ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList); + sstables.addAll(view.sstables); + } - addTransferFiles(sstableDetails); + List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size()); + for (SSTableReader sstable : sstables) + { + sections.add(new SSTableStreamingSections(sstable, + sstable.getPositionsForRanges(ranges), + sstable.estimatedKeysForRanges(ranges))); + } + return sections; + } + catch (Throwable t) + { + SSTableReader.releaseReferences(sstables); + throw t; + } } + + public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails) { - for (SSTableStreamingSections details : sstableDetails) + Iterator<SSTableStreamingSections> iter = sstableDetails.iterator(); + while (iter.hasNext()) { + SSTableStreamingSections details = iter.next(); if (details.sections.isEmpty()) { // A reference was acquired on the sstable and we won't stream it details.sstable.releaseReference(); + iter.remove(); continue; } @@ -295,6 +313,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe transfers.put(cfId, task); } task.addTransferFile(details.sstable, details.estimatedKeys, details.sections); + iter.remove(); } }
