Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 c5c0585b4 -> 0e8310077 refs/heads/trunk 3e305f809 -> 0956a8a71
Fix resource leak in event of corrupt sstable patch by benedict; review by yukim for CASSANDRA-7932 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007 Branch: refs/heads/cassandra-2.1 Commit: 0e831007760bffced8687f51b99525b650d7e193 Parents: c5c0585 Author: Benedict Elliott Smith <[email protected]> Authored: Fri Sep 19 18:17:19 2014 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Fri Sep 19 18:17:19 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/DataTracker.java | 5 +- .../compaction/AbstractCompactionStrategy.java | 56 ++++- .../db/compaction/CompactionManager.java | 193 +++++++++--------- .../cassandra/db/compaction/CompactionTask.java | 203 +++++++++---------- .../compaction/LeveledCompactionStrategy.java | 43 ++-- .../cassandra/db/compaction/Upgrader.java | 3 +- .../cassandra/utils/CloseableIterator.java | 2 +- .../LeveledCompactionStrategyTest.java | 2 +- 9 files changed, 286 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d3ee7d9..f55e5d2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * Fix resource leak in event of corrupt sstable * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131) * Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930) * Invalidate prepared statements when their keyspace or table is http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index 857e8bd..24ea9dd 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -320,7 +320,7 @@ public class DataTracker void removeUnreadableSSTables(File directory) { View currentView, newView; - List<SSTableReader> remaining = new ArrayList<>(); + Set<SSTableReader> remaining = new HashSet<>(); do { currentView = view.get(); @@ -334,6 +334,9 @@ public class DataTracker newView = currentView.replace(currentView.sstables, remaining); } while (!view.compareAndSet(currentView, newView)); + for (SSTableReader sstable : currentView.sstables) + if (!remaining.contains(sstable)) + sstable.releaseReference(); notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 1bbc93d..97696a8 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.util.*; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; @@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy * allow for a more memory efficient solution if we know the sstable don't overlap (see * LeveledCompactionStrategy for instance). */ - public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) + public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) { RateLimiter limiter = CompactionManager.instance.getRateLimiter(); ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(); - for (SSTableReader sstable : sstables) - scanners.add(sstable.getScanner(range, limiter)); - return scanners; + try + { + for (SSTableReader sstable : sstables) + scanners.add(sstable.getScanner(range, limiter)); + } + catch (Throwable t) + { + try + { + new ScannerList(scanners).close(); + } + catch (Throwable t2) + { + t.addSuppressed(t2); + } + throw t; + } + return new ScannerList(scanners); + } + + public static class ScannerList implements AutoCloseable + { + public final List<ICompactionScanner> scanners; + public ScannerList(List<ICompactionScanner> scanners) + { + this.scanners = scanners; + } + + public void close() + { + Throwable t = null; + for (ICompactionScanner scanner : scanners) + { + try + { + scanner.close(); + } + catch (Throwable t2) + { + if (t == null) + t = t2; + else + t.addSuppressed(t2); + } + } + if (t != null) + throw Throwables.propagate(t); + } } - public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact) + public ScannerList getScanners(Collection<SSTableReader> toCompact) { return getScanners(toCompact, null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 296fe45..e309cfb 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -870,89 +870,98 @@ public class CompactionManager implements CompactionManagerMBean if (!cfs.isValid()) return; - Collection<SSTableReader> sstables; - String snapshotName = validator.desc.sessionId.toString(); - int gcBefore; - boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); - if (isSnapshotValidation) - { - // If there is a snapshot created for the session then read from there. - sstables = cfs.getSnapshotSSTableReader(snapshotName); - - // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute - // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation - // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case - // 'as good as in the non-snapshot' case) - gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName)); - } - else + Collection<SSTableReader> sstables = null; + try { - // flush first so everyone is validating data that is as similar as possible - StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); - // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced - // instead so they won't be cleaned up if they do get compacted during the validation - if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null) - sstables = cfs.markCurrentSSTablesReferenced(); - else - sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId); - if (validator.gcBefore > 0) - gcBefore = validator.gcBefore; + String snapshotName = validator.desc.sessionId.toString(); + int gcBefore; + boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); + if (isSnapshotValidation) + { + // If there is a snapshot created for the session then read from there. + sstables = cfs.getSnapshotSSTableReader(snapshotName); + + // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute + // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation + // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case + // 'as good as in the non-snapshot' case) + gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName)); + } else - gcBefore = getDefaultGcBefore(cfs); - } - - // Create Merkle tree suitable to hold estimated partitions for given range. - // We blindly assume that partition is evenly distributed on all sstables for now. - long numPartitions = 0; - for (SSTableReader sstable : sstables) - { - numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range)); - } - // determine tree depth from number of partitions, but cap at 20 to prevent large tree. - int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; - MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + { + // flush first so everyone is validating data that is as similar as possible + StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); + // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced + // instead so they won't be cleaned up if they do get compacted during the validation + if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null) + sstables = cfs.markCurrentSSTablesReferenced(); + else + sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId); - CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore); - CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); + if (validator.gcBefore > 0) + gcBefore = validator.gcBefore; + else + gcBefore = getDefaultGcBefore(cfs); + } - long start = System.nanoTime(); - metrics.beginCompaction(ci); - try - { - // validate the CF as we iterate over it - validator.prepare(cfs, tree); - while (iter.hasNext()) + // Create Merkle tree suitable to hold estimated partitions for given range. + // We blindly assume that partition is evenly distributed on all sstables for now. + long numPartitions = 0; + for (SSTableReader sstable : sstables) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - AbstractCompactedRow row = iter.next(); - validator.add(row); + numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range)); } - validator.complete(); - } - finally - { - iter.close(); - SSTableReader.releaseReferences(sstables); - if (isSnapshotValidation) + // determine tree depth from number of partitions, but cap at 20 to prevent large tree. + int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; + MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + + long start = System.nanoTime(); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range)) { - cfs.clearSnapshot(snapshotName); + CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + metrics.beginCompaction(ci); + try + { + // validate the CF as we iterate over it + validator.prepare(cfs, tree); + while (iter.hasNext()) + { + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + AbstractCompactedRow row = iter.next(); + validator.add(row); + } + validator.complete(); + } + finally + { + if (isSnapshotValidation) + { + cfs.clearSnapshot(snapshotName); + } + + metrics.finishCompaction(ci); + } } - metrics.finishCompaction(ci); + if (logger.isDebugEnabled()) + { + // MT serialize may take time + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}", + duration, + depth, + numPartitions, + MerkleTree.serializer.serializedSize(tree, 0), + validator.desc); + } } - - if (logger.isDebugEnabled()) + finally { - // MT serialize may take time - long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}", - duration, - depth, - numPartitions, - MerkleTree.serializer.serializedSize(tree, 0), - validator.desc); + if (sstables != null) + SSTableReader.releaseReferences(sstables); } } @@ -993,32 +1002,28 @@ public class CompactionManager implements CompactionManagerMBean SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false); AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable)); - - try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS)) + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable))); + CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) { repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable)); unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable)); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller); - - try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator()) + CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + while(iter.hasNext()) { - while(iter.hasNext()) + AbstractCompactedRow row = iter.next(); + // if current range from sstable is repaired, save it into the new repaired sstable + if (Range.isInRanges(row.key.getToken(), ranges)) { - AbstractCompactedRow row = iter.next(); - // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) - { - repairedSSTableWriter.append(row); - repairedKeyCount++; - } - // otherwise save into the new 'non-repaired' table - else - { - unRepairedSSTableWriter.append(row); - unrepairedKeyCount++; - } + repairedSSTableWriter.append(row); + repairedKeyCount++; + } + // otherwise save into the new 'non-repaired' table + else + { + unRepairedSSTableWriter.append(row); + unrepairedKeyCount++; } } // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them @@ -1109,11 +1114,9 @@ public class CompactionManager implements CompactionManagerMBean private static class ValidationCompactionIterable extends CompactionIterable { - public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range, int gcBefore) + public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore) { - super(OperationType.VALIDATION, - cfs.getCompactionStrategy().getScanners(sstables, range), - new ValidationCompactionController(cfs, gcBefore)); + super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index c1c5504..6217348 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask UUID taskId = SystemKeyspace.startCompaction(cfs, sstables); - CompactionController controller = getCompactionController(sstables); - Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables()); - // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) @@ -139,120 +137,117 @@ public class CompactionTask extends AbstractCompactionTask long start = System.nanoTime(); long totalKeysWritten = 0; - long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact)); - long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); - long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); - logger.debug("Expected bloom filter size : {}", keysPerSSTable); - - // TODO: errors when creating the scanners can result in untidied resources - AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller); - CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); - - // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to - // replace the old entries. Track entries to preheat here until then. - long minRepairedAt = getMinRepairedAt(actuallyCompact); - // we only need the age of the data that we're actually retaining - long maxAge = getMaxDataAge(actuallyCompact); - if (collector != null) - collector.beginCompaction(ci); - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline); - try + + try (CompactionController controller = getCompactionController(sstables);) { - if (!iter.hasNext()) - { - // don't mark compacted in the finally block, since if there _is_ nondeleted data, - // we need to sync it (via closeAndOpen) first, so there is no period during which - // a crash could cause data loss. - cfs.markObsolete(sstables, compactionType); - return; - } - writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); - while (iter.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables()); + + long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact)); + long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); + long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); + logger.debug("Expected bloom filter size : {}", keysPerSSTable); - AbstractCompactedRow row = iter.next(); - if (writer.append(row) != null) + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + { + AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + + // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to + // replace the old entries. Track entries to preheat here until then. + long minRepairedAt = getMinRepairedAt(actuallyCompact); + // we only need the age of the data that we're actually retaining + long maxAge = getMaxDataAge(actuallyCompact); + if (collector != null) + collector.beginCompaction(ci); + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline); + try { - totalKeysWritten++; - if (newSSTableSegmentThresholdReached(writer.currentWriter())) + if (!iter.hasNext()) { - writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); + // don't mark compacted in the finally block, since if there _is_ nondeleted data, + // we need to sync it (via closeAndOpen) first, so there is no period during which + // a crash could cause data loss. + cfs.markObsolete(sstables, compactionType); + return; } + + writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); + while (iter.hasNext()) + { + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + AbstractCompactedRow row = iter.next(); + if (writer.append(row) != null) + { + totalKeysWritten++; + if (newSSTableSegmentThresholdReached(writer.currentWriter())) + { + writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); + } + } + } + + // don't replace old sstables yet, as we need to mark the compaction finished in the system table + writer.finish(false); } - } + catch (Throwable t) + { + writer.abort(); + throw t; + } + finally + { - // don't replace old sstables yet, as we need to mark the compaction finished in the system table - writer.finish(false); - } - catch (Throwable t) - { - writer.abort(); - throw t; - } - finally - { - controller.close(); + // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones + // (in replaceCompactedSSTables) + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); + if (collector != null) + collector.finishCompaction(ci); + } - if (collector != null) - collector.finishCompaction(ci); + Collection<SSTableReader> oldSStables = this.sstables; + List<SSTableReader> newSStables = writer.finished(); + if (!offline) + cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); + + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(oldSStables); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; + + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + long[] counts = ci.getMergedRowCounts(); + StringBuilder mergeSummary = new StringBuilder(counts.length * 10); + Map<Integer, Long> mergedRows = new HashMap<>(); + for (int i = 0; i < counts.length; i++) + { + long count = counts[i]; + if (count == 0) + continue; + + int rows = i + 1; + totalSourceRows += rows * count; + mergeSummary.append(String.format("%d:%d, ", rows, count)); + mergedRows.put(rows, count); + } - try - { - // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure - // we don't end up with compaction information hanging around indefinitely in limbo. - iter.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); + SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); + logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString())); + logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten)); } } - - Collection<SSTableReader> oldSStables = this.sstables; - List<SSTableReader> newSStables = writer.finished(); - if (!offline) - cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); - - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(oldSStables); - long endsize = SSTableReader.getTotalBytes(newSStables); - double ratio = (double) endsize / (double) startsize; - - StringBuilder newSSTableNames = new StringBuilder(); - for (SSTableReader reader : newSStables) - newSSTableNames.append(reader.descriptor.baseFilename()).append(","); - - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - long[] counts = ci.getMergedRowCounts(); - StringBuilder mergeSummary = new StringBuilder(counts.length * 10); - Map<Integer, Long> mergedRows = new HashMap<>(); - for (int i = 0; i < counts.length; i++) - { - long count = counts[i]; - if (count == 0) - continue; - - int rows = i + 1; - totalSourceRows += rows * count; - mergeSummary.append(String.format("%d:%d, ", rows, count)); - mergedRows.put(rows, count); - } - - SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); - logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString())); - logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten)); } private long getMinRepairedAt(Set<SSTableReader> actuallyCompact) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 3ee59ad..7f2d881 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem return maxSSTableSizeInMB * 1024L * 1024L; } - public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) + public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) { Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create(); for (SSTableReader sstable : sstables) @@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem } List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size()); - for (Integer level : byLevel.keySet()) + try { - // level can be -1 when sstables are added to DataTracker but not to LeveledManifest - // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables. - if (level <= 0) + for (Integer level : byLevel.keySet()) { - // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each - for (SSTableReader sstable : byLevel.get(level)) - scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter())); + // level can be -1 when sstables are added to DataTracker but not to LeveledManifest + // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables. + if (level <= 0) + { + // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each + for (SSTableReader sstable : byLevel.get(level)) + scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter())); + } + else + { + // Create a LeveledScanner that only opens one sstable at a time, in sorted order + List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range); + if (!intersecting.isEmpty()) + scanners.add(new LeveledScanner(intersecting, range)); + } } - else + } + catch (Throwable t) + { + try + { + new ScannerList(scanners).close(); + } + catch (Throwable t2) { - // Create a LeveledScanner that only opens one sstable at a time, in sorted order - List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range); - if (!intersecting.isEmpty()) - scanners.add(new LeveledScanner(intersecting, range)); + t.addSuppressed(t2); } + throw t; } - return scanners; + return new ScannerList(scanners); } // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 734fe23..f102fef 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -88,8 +88,9 @@ public class Upgrader outputHandler.output("Upgrading " + sstable); SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true); - try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator()) + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade)) { + Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator(); writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java index 399c6d1..7474f3d 100644 --- a/src/java/org/apache/cassandra/utils/CloseableIterator.java +++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java @@ -21,6 +21,6 @@ import java.io.Closeable; import java.util.Iterator; // so we can instantiate anonymous classes implementing both interfaces -public interface CloseableIterator<T> extends Iterator<T>, Closeable +public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable { } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index defb087..65c7b69 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader // get LeveledScanner for level 1 sstables Collection<SSTableReader> sstables = strategy.manifest.getLevel(1); - List<ICompactionScanner> scanners = strategy.getScanners(sstables); + List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners; assertEquals(1, scanners.size()); // should be one per level ICompactionScanner scanner = scanners.get(0); // scan through to the end
