Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 c5c0585b4 -> 0e8310077
  refs/heads/trunk 3e305f809 -> 0956a8a71


Fix resource leak in event of corrupt sstable

patch by benedict; review by yukim for CASSANDRA-7932


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007

Branch: refs/heads/cassandra-2.1
Commit: 0e831007760bffced8687f51b99525b650d7e193
Parents: c5c0585
Author: Benedict Elliott Smith <[email protected]>
Authored: Fri Sep 19 18:17:19 2014 +0100
Committer: Benedict Elliott Smith <[email protected]>
Committed: Fri Sep 19 18:17:19 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   5 +-
 .../compaction/AbstractCompactionStrategy.java  |  56 ++++-
 .../db/compaction/CompactionManager.java        | 193 +++++++++---------
 .../cassandra/db/compaction/CompactionTask.java | 203 +++++++++----------
 .../compaction/LeveledCompactionStrategy.java   |  43 ++--
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../cassandra/utils/CloseableIterator.java      |   2 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 9 files changed, 286 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3ee7d9..f55e5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Fix resource leak in event of corrupt sstable
  * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
  * Provide visibility into prepared statements churn (CASSANDRA-7921, 
CASSANDRA-7930)
  * Invalidate prepared statements when their keyspace or table is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index 857e8bd..24ea9dd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -320,7 +320,7 @@ public class DataTracker
     void removeUnreadableSSTables(File directory)
     {
         View currentView, newView;
-        List<SSTableReader> remaining = new ArrayList<>();
+        Set<SSTableReader> remaining = new HashSet<>();
         do
         {
             currentView = view.get();
@@ -334,6 +334,9 @@ public class DataTracker
             newView = currentView.replace(currentView.sstables, remaining);
         }
         while (!view.compareAndSet(currentView, newView));
+        for (SSTableReader sstable : currentView.sstables)
+            if (!remaining.contains(sstable))
+                sstable.releaseReference();
         notifySSTablesChanged(remaining, 
Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1bbc93d..97696a8 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
@@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy
      * allow for a more memory efficient solution if we know the sstable don't 
overlap (see
      * LeveledCompactionStrategy for instance).
      */
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> 
sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Range<Token> range)
     {
         RateLimiter limiter = CompactionManager.instance.getRateLimiter();
         ArrayList<ICompactionScanner> scanners = new 
ArrayList<ICompactionScanner>();
-        for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getScanner(range, limiter));
-        return scanners;
+        try
+        {
+            for (SSTableReader sstable : sstables)
+                scanners.add(sstable.getScanner(range, limiter));
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
+            {
+                t.addSuppressed(t2);
+            }
+            throw t;
+        }
+        return new ScannerList(scanners);
+    }
+
+    public static class ScannerList implements AutoCloseable
+    {
+        public final List<ICompactionScanner> scanners;
+        public ScannerList(List<ICompactionScanner> scanners)
+        {
+            this.scanners = scanners;
+        }
+
+        public void close()
+        {
+            Throwable t = null;
+            for (ICompactionScanner scanner : scanners)
+            {
+                try
+                {
+                    scanner.close();
+                }
+                catch (Throwable t2)
+                {
+                    if (t == null)
+                        t = t2;
+                    else
+                        t.addSuppressed(t2);
+                }
+            }
+            if (t != null)
+                throw Throwables.propagate(t);
+        }
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> 
toCompact)
+    public ScannerList getScanners(Collection<SSTableReader> toCompact)
     {
         return getScanners(toCompact, null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 296fe45..e309cfb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -870,89 +870,98 @@ public class CompactionManager implements 
CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        Collection<SSTableReader> sstables;
-        String snapshotName = validator.desc.sessionId.toString();
-        int gcBefore;
-        boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
-        if (isSnapshotValidation)
-        {
-            // If there is a snapshot created for the session then read from 
there.
-            sstables = cfs.getSnapshotSSTableReader(snapshotName);
-
-            // Computing gcbefore based on the current time wouldn't be very 
good because we know each replica will execute
-            // this at a different time (that's the whole purpose of repair 
with snaphsot). So instead we take the creation
-            // time of the snapshot, which should give us roughtly the same 
time on each replica (roughtly being in that case
-            // 'as good as in the non-snapshot' case)
-            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
-        }
-        else
+        Collection<SSTableReader> sstables = null;
+        try
         {
-            // flush first so everyone is validating data that is as similar 
as possible
-            StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), 
cfs.name);
-            // we don't mark validating sstables as compacting in DataTracker, 
so we have to mark them referenced
-            // instead so they won't be cleaned up if they do get compacted 
during the validation
-            if (validator.desc.parentSessionId == null || 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId)
 == null)
-                sstables = cfs.markCurrentSSTablesReferenced();
-            else
-                sstables = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-            if (validator.gcBefore > 0)
-                gcBefore = validator.gcBefore;
+            String snapshotName = validator.desc.sessionId.toString();
+            int gcBefore;
+            boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+            if (isSnapshotValidation)
+            {
+                // If there is a snapshot created for the session then read 
from there.
+                sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+                // Computing gcbefore based on the current time wouldn't be 
very good because we know each replica will execute
+                // this at a different time (that's the whole purpose of 
repair with snaphsot). So instead we take the creation
+                // time of the snapshot, which should give us roughtly the 
same time on each replica (roughtly being in that case
+                // 'as good as in the non-snapshot' case)
+                gcBefore = 
cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+            }
             else
-                gcBefore = getDefaultGcBefore(cfs);
-        }
-
-        // Create Merkle tree suitable to hold estimated partitions for given 
range.
-        // We blindly assume that partition is evenly distributed on all 
sstables for now.
-        long numPartitions = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            numPartitions += 
sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
-        }
-        // determine tree depth from number of partitions, but cap at 20 to 
prevent large tree.
-        int depth = numPartitions > 0 ? (int) 
Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
-        MerkleTree tree = new MerkleTree(cfs.partitioner, 
validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+            {
+                // flush first so everyone is validating data that is as 
similar as possible
+                
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                // we don't mark validating sstables as compacting in 
DataTracker, so we have to mark them referenced
+                // instead so they won't be cleaned up if they do get 
compacted during the validation
+                if (validator.desc.parentSessionId == null || 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId)
 == null)
+                    sstables = cfs.markCurrentSSTablesReferenced();
+                else
+                    sstables = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-        CompactionIterable ci = new ValidationCompactionIterable(cfs, 
sstables, validator.desc.range, gcBefore);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+                if (validator.gcBefore > 0)
+                    gcBefore = validator.gcBefore;
+                else
+                    gcBefore = getDefaultGcBefore(cfs);
+            }
 
-        long start = System.nanoTime();
-        metrics.beginCompaction(ci);
-        try
-        {
-            // validate the CF as we iterate over it
-            validator.prepare(cfs, tree);
-            while (iter.hasNext())
+            // Create Merkle tree suitable to hold estimated partitions for 
given range.
+            // We blindly assume that partition is evenly distributed on all 
sstables for now.
+            long numPartitions = 0;
+            for (SSTableReader sstable : sstables)
             {
-                if (ci.isStopRequested())
-                    throw new 
CompactionInterruptedException(ci.getCompactionInfo());
-                AbstractCompactedRow row = iter.next();
-                validator.add(row);
+                numPartitions += 
sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
             }
-            validator.complete();
-        }
-        finally
-        {
-            iter.close();
-            SSTableReader.releaseReferences(sstables);
-            if (isSnapshotValidation)
+            // determine tree depth from number of partitions, but cap at 20 
to prevent large tree.
+            int depth = numPartitions > 0 ? (int) 
Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            MerkleTree tree = new MerkleTree(cfs.partitioner, 
validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+            long start = System.nanoTime();
+            try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
             {
-                cfs.clearSnapshot(snapshotName);
+                CompactionIterable ci = new ValidationCompactionIterable(cfs, 
scanners.scanners, gcBefore);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                metrics.beginCompaction(ci);
+                try
+                {
+                    // validate the CF as we iterate over it
+                    validator.prepare(cfs, tree);
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new 
CompactionInterruptedException(ci.getCompactionInfo());
+                        AbstractCompactedRow row = iter.next();
+                        validator.add(row);
+                    }
+                    validator.complete();
+                }
+                finally
+                {
+                    if (isSnapshotValidation)
+                    {
+                        cfs.clearSnapshot(snapshotName);
+                    }
+
+                    metrics.finishCompaction(ci);
+                }
             }
 
-            metrics.finishCompaction(ci);
+            if (logger.isDebugEnabled())
+            {
+                // MT serialize may take time
+                long duration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                logger.debug("Validation finished in {} msec, depth {} for {} 
keys, serialized size {} bytes for {}",
+                             duration,
+                             depth,
+                             numPartitions,
+                             MerkleTree.serializer.serializedSize(tree, 0),
+                             validator.desc);
+            }
         }
-
-        if (logger.isDebugEnabled())
+        finally
         {
-            // MT serialize may take time
-            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
-            logger.debug("Validation finished in {} msec, depth {} for {} 
keys, serialized size {} bytes for {}",
-                         duration,
-                         depth,
-                         numPartitions,
-                         MerkleTree.serializer.serializedSize(tree, 0),
-                         validator.desc);
+            if (sstables != null)
+                SSTableReader.releaseReferences(sstables);
         }
     }
 
@@ -993,32 +1002,28 @@ public class CompactionManager implements 
CompactionManagerMBean
             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
 
             AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-            List<ICompactionScanner> scanners = 
strategy.getScanners(Arrays.asList(sstable));
-
-            try (CompactionController controller = new 
CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), 
CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+            try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+                 CompactionController controller = new 
CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
             {
                 
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, repairedAt, sstable));
                 
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstable));
 
-                CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
-                try (CloseableIterator<AbstractCompactedRow> iter = 
ci.iterator())
+                CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                while(iter.hasNext())
                 {
-                    while(iter.hasNext())
+                    AbstractCompactedRow row = iter.next();
+                    // if current range from sstable is repaired, save it into 
the new repaired sstable
+                    if (Range.isInRanges(row.key.getToken(), ranges))
                     {
-                        AbstractCompactedRow row = iter.next();
-                        // if current range from sstable is repaired, save it 
into the new repaired sstable
-                        if (Range.isInRanges(row.key.getToken(), ranges))
-                        {
-                            repairedSSTableWriter.append(row);
-                            repairedKeyCount++;
-                        }
-                        // otherwise save into the new 'non-repaired' table
-                        else
-                        {
-                            unRepairedSSTableWriter.append(row);
-                            unrepairedKeyCount++;
-                        }
+                        repairedSSTableWriter.append(row);
+                        repairedKeyCount++;
+                    }
+                    // otherwise save into the new 'non-repaired' table
+                    else
+                    {
+                        unRepairedSSTableWriter.append(row);
+                        unrepairedKeyCount++;
                     }
                 }
                 // we have the same readers being rewritten by both writers, 
so we ask the first one NOT to close them
@@ -1109,11 +1114,9 @@ public class CompactionManager implements 
CompactionManagerMBean
 
     private static class ValidationCompactionIterable extends 
CompactionIterable
     {
-        public ValidationCompactionIterable(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Range<Token> range, int gcBefore)
+        public ValidationCompactionIterable(ColumnFamilyStore cfs, 
List<ICompactionScanner> scanners, int gcBefore)
         {
-            super(OperationType.VALIDATION,
-                  cfs.getCompactionStrategy().getScanners(sstables, range),
-                  new ValidationCompactionController(cfs, gcBefore));
+            super(OperationType.VALIDATION, scanners, new 
ValidationCompactionController(cfs, gcBefore));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c1c5504..6217348 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask
 
         UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
 
-        CompactionController controller = getCompactionController(sstables);
-        Set<SSTableReader> actuallyCompact = Sets.difference(sstables, 
controller.getFullyExpiredSSTables());
-
         // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting
         // all the sstables (that existed when we started)
@@ -139,120 +137,117 @@ public class CompactionTask extends 
AbstractCompactionTask
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
-        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), 
SSTableReader.getApproximateKeyCount(actuallyCompact));
-        long estimatedSSTables = Math.max(1, 
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
-        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / 
estimatedSSTables);
-        logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
-        // TODO: errors when creating the scanners can result in untidied 
resources
-        AbstractCompactionIterable ci = new CompactionIterable(compactionType, 
strategy.getScanners(actuallyCompact), controller);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
-        // we can't preheat until the tracker has been set. This doesn't 
happen until we tell the cfs to
-        // replace the old entries.  Track entries to preheat here until then.
-        long minRepairedAt = getMinRepairedAt(actuallyCompact);
-        // we only need the age of the data that we're actually retaining
-        long maxAge = getMaxDataAge(actuallyCompact);
-        if (collector != null)
-            collector.beginCompaction(ci);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, 
compactionType, offline);
-        try
+
+        try (CompactionController controller = 
getCompactionController(sstables);)
         {
-            if (!iter.hasNext())
-            {
-                // don't mark compacted in the finally block, since if there 
_is_ nondeleted data,
-                // we need to sync it (via closeAndOpen) first, so there is no 
period during which
-                // a crash could cause data loss.
-                cfs.markObsolete(sstables, compactionType);
-                return;
-            }
 
-            writer.switchWriter(createCompactionWriter(sstableDirectory, 
keysPerSSTable, minRepairedAt));
-            while (iter.hasNext())
-            {
-                if (ci.isStopRequested())
-                    throw new 
CompactionInterruptedException(ci.getCompactionInfo());
+            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, 
controller.getFullyExpiredSSTables());
+
+            long estimatedTotalKeys = 
Math.max(cfs.metadata.getMinIndexInterval(), 
SSTableReader.getApproximateKeyCount(actuallyCompact));
+            long estimatedSSTables = Math.max(1, 
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+            long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys 
/ estimatedSSTables);
+            logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
-                AbstractCompactedRow row = iter.next();
-                if (writer.append(row) != null)
+            try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(actuallyCompact))
+            {
+                AbstractCompactionIterable ci = new 
CompactionIterable(compactionType, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+                // we can't preheat until the tracker has been set. This 
doesn't happen until we tell the cfs to
+                // replace the old entries.  Track entries to preheat here 
until then.
+                long minRepairedAt = getMinRepairedAt(actuallyCompact);
+                // we only need the age of the data that we're actually 
retaining
+                long maxAge = getMaxDataAge(actuallyCompact);
+                if (collector != null)
+                    collector.beginCompaction(ci);
+                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 
maxAge, compactionType, offline);
+                try
                 {
-                    totalKeysWritten++;
-                    if 
(newSSTableSegmentThresholdReached(writer.currentWriter()))
+                    if (!iter.hasNext())
                     {
-                        
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                        // don't mark compacted in the finally block, since if 
there _is_ nondeleted data,
+                        // we need to sync it (via closeAndOpen) first, so 
there is no period during which
+                        // a crash could cause data loss.
+                        cfs.markObsolete(sstables, compactionType);
+                        return;
                     }
+
+                    
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new 
CompactionInterruptedException(ci.getCompactionInfo());
+
+                        AbstractCompactedRow row = iter.next();
+                        if (writer.append(row) != null)
+                        {
+                            totalKeysWritten++;
+                            if 
(newSSTableSegmentThresholdReached(writer.currentWriter()))
+                            {
+                                
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                            }
+                        }
+                    }
+
+                    // don't replace old sstables yet, as we need to mark the 
compaction finished in the system table
+                    writer.finish(false);
                 }
-            }
+                catch (Throwable t)
+                {
+                    writer.abort();
+                    throw t;
+                }
+                finally
+                {
 
-            // don't replace old sstables yet, as we need to mark the 
compaction finished in the system table
-            writer.finish(false);
-        }
-        catch (Throwable t)
-        {
-            writer.abort();
-            throw t;
-        }
-        finally
-        {
-            controller.close();
+                    // point of no return -- the new sstables are live on 
disk; next we'll start deleting the old ones
+                    // (in replaceCompactedSSTables)
+                    if (taskId != null)
+                        SystemKeyspace.finishCompaction(taskId);
 
-            // point of no return -- the new sstables are live on disk; next 
we'll start deleting the old ones
-            // (in replaceCompactedSSTables)
-            if (taskId != null)
-                SystemKeyspace.finishCompaction(taskId);
+                    if (collector != null)
+                        collector.finishCompaction(ci);
+                }
 
-            if (collector != null)
-                collector.finishCompaction(ci);
+                Collection<SSTableReader> oldSStables = this.sstables;
+                List<SSTableReader> newSStables = writer.finished();
+                if (!offline)
+                    
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, 
compactionType);
+
+                // log a bunch of statistics about the result and save to 
system table compaction_history
+                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+                long startsize = SSTableReader.getTotalBytes(oldSStables);
+                long endsize = SSTableReader.getTotalBytes(newSStables);
+                double ratio = (double) endsize / (double) startsize;
+
+                StringBuilder newSSTableNames = new StringBuilder();
+                for (SSTableReader reader : newSStables)
+                    
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / 
((double) dTime / 1000) : 0;
+                long totalSourceRows = 0;
+                long[] counts = ci.getMergedRowCounts();
+                StringBuilder mergeSummary = new StringBuilder(counts.length * 
10);
+                Map<Integer, Long> mergedRows = new HashMap<>();
+                for (int i = 0; i < counts.length; i++)
+                {
+                    long count = counts[i];
+                    if (count == 0)
+                        continue;
+
+                    int rows = i + 1;
+                    totalSourceRows += rows * count;
+                    mergeSummary.append(String.format("%d:%d, ", rows, count));
+                    mergedRows.put(rows, count);
+                }
 
-            try
-            {
-                // We don't expect this to throw, but just in case, we do it 
after the cleanup above, to make sure
-                // we don't end up with compaction information hanging around 
indefinitely in limbo.
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
+                SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), 
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+                logger.info(String.format("Compacted %d sstables to [%s].  %,d 
bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions 
merged to %,d.  Partition merge counts were {%s}",
+                                          oldSStables.size(), 
newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, 
mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+                logger.debug(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
+                logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", 
totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - 
estimatedTotalKeys)/totalKeysWritten));
             }
         }
-
-        Collection<SSTableReader> oldSStables = this.sstables;
-        List<SSTableReader> newSStables = writer.finished();
-        if (!offline)
-            cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, 
newSStables, compactionType);
-
-        // log a bunch of statistics about the result and save to system table 
compaction_history
-        long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        long startsize = SSTableReader.getTotalBytes(oldSStables);
-        long endsize = SSTableReader.getTotalBytes(newSStables);
-        double ratio = (double) endsize / (double) startsize;
-
-        StringBuilder newSSTableNames = new StringBuilder();
-        for (SSTableReader reader : newSStables)
-            
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
-        double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) 
dTime / 1000) : 0;
-        long totalSourceRows = 0;
-        long[] counts = ci.getMergedRowCounts();
-        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-        Map<Integer, Long> mergedRows = new HashMap<>();
-        for (int i = 0; i < counts.length; i++)
-        {
-            long count = counts[i];
-            if (count == 0)
-                continue;
-
-            int rows = i + 1;
-            totalSourceRows += rows * count;
-            mergeSummary.append(String.format("%d:%d, ", rows, count));
-            mergedRows.put(rows, count);
-        }
-
-        SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), 
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-        logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes 
to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to 
%,d.  Partition merge counts were {%s}",
-                                  oldSStables.size(), 
newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, 
mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-        logger.debug(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
-        logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", 
totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - 
estimatedTotalKeys)/totalKeysWritten));
     }
 
     private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..7f2d881 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
         return maxSSTableSizeInMB * 1024L * 1024L;
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> 
sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, 
Range<Token> range)
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
@@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
         }
 
         List<ICompactionScanner> scanners = new 
ArrayList<ICompactionScanner>(sstables.size());
-        for (Integer level : byLevel.keySet())
+        try
         {
-            // level can be -1 when sstables are added to DataTracker but not 
to LeveledManifest
-            // since we don't know which level those sstable belong yet, we 
simply do the same as L0 sstables.
-            if (level <= 0)
+            for (Integer level : byLevel.keySet())
             {
-                // L0 makes no guarantees about overlapping-ness.  Just create 
a direct scanner for each
-                for (SSTableReader sstable : byLevel.get(level))
-                    scanners.add(sstable.getScanner(range, 
CompactionManager.instance.getRateLimiter()));
+                // level can be -1 when sstables are added to DataTracker but 
not to LeveledManifest
+                // since we don't know which level those sstable belong yet, 
we simply do the same as L0 sstables.
+                if (level <= 0)
+                {
+                    // L0 makes no guarantees about overlapping-ness.  Just 
create a direct scanner for each
+                    for (SSTableReader sstable : byLevel.get(level))
+                        scanners.add(sstable.getScanner(range, 
CompactionManager.instance.getRateLimiter()));
+                }
+                else
+                {
+                    // Create a LeveledScanner that only opens one sstable at 
a time, in sorted order
+                    List<SSTableReader> intersecting = 
LeveledScanner.intersecting(byLevel.get(level), range);
+                    if (!intersecting.isEmpty())
+                        scanners.add(new LeveledScanner(intersecting, range));
+                }
             }
-            else
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
             {
-                // Create a LeveledScanner that only opens one sstable at a 
time, in sorted order
-                List<SSTableReader> intersecting = 
LeveledScanner.intersecting(byLevel.get(level), range);
-                if (!intersecting.isEmpty())
-                    scanners.add(new LeveledScanner(intersecting, range));
+                t.addSuppressed(t2);
             }
+            throw t;
         }
 
-        return scanners;
+        return new ScannerList(scanners);
     }
 
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be 
from the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java 
b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 734fe23..f102fef 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -88,8 +88,9 @@ public class Upgrader
         outputHandler.output("Upgrading " + sstable);
 
         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, 
CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, 
true);
-        try (CloseableIterator<AbstractCompactedRow> iter = new 
CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), 
controller).iterator())
+        try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(this.toUpgrade))
         {
+            Iterator<AbstractCompactedRow> iter = new 
CompactionIterable(compactionType, scanners.scanners, controller).iterator();
             
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java 
b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 399c6d1..7474f3d 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,6 @@ import java.io.Closeable;
 import java.util.Iterator;
 
 // so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, 
Closeable
 {
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index defb087..65c7b69 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends 
SchemaLoader
 
         // get LeveledScanner for level 1 sstables
         Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
-        List<ICompactionScanner> scanners = strategy.getScanners(sstables);
+        List<ICompactionScanner> scanners = 
strategy.getScanners(sstables).scanners;
         assertEquals(1, scanners.size()); // should be one per level
         ICompactionScanner scanner = scanners.get(0);
         // scan through to the end

Reply via email to