Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 ddca610c9 -> bdbb071f4
Remove ref counting in SSTableScanner, fix CompactionTask ordering Patch by jmckenzie; reviewed by belliottsmith as a follow-up for CASSANDRA-8399 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdbb071f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdbb071f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdbb071f Branch: refs/heads/cassandra-2.1 Commit: bdbb071f4f87131d6996aac52f2b75a5833d5238 Parents: ddca610 Author: Joshua McKenzie <jmcken...@apache.org> Authored: Wed Jan 7 13:05:31 2015 -0600 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Wed Jan 7 14:05:40 2015 -0600 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionTask.java | 82 ++++++++++---------- .../cassandra/io/sstable/SSTableScanner.java | 8 +- 2 files changed, 45 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 4885bc8..d215b4c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -140,7 +140,6 @@ public class CompactionTask extends AbstractCompactionTask try (CompactionController controller = getCompactionController(sstables);) { - Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables()); long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact)); @@ -149,11 +148,16 @@ public class CompactionTask extends AbstractCompactionTask long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes()); logger.debug("Expected bloom filter size : {}", keysPerSSTable); + List<SSTableReader> newSStables; + AbstractCompactionIterable ci; + + // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references + // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. + // See CASSANDRA-8019 and CASSANDRA-8399 try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { - AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller); + ci = new CompactionIterable(compactionType, scanners.scanners, controller); Iterator<AbstractCompactedRow> iter = ci.iterator(); - List<SSTableReader> newSStables; // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to // replace the old entries. Track entries to preheat here until then. long minRepairedAt = getMinRepairedAt(actuallyCompact); @@ -215,44 +219,44 @@ public class CompactionTask extends AbstractCompactionTask if (collector != null) collector.finishCompaction(ci); } + } - Collection<SSTableReader> oldSStables = this.sstables; - if (!offline) - cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); - - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(oldSStables); - long endsize = SSTableReader.getTotalBytes(newSStables); - double ratio = (double) endsize / (double) startsize; - - StringBuilder newSSTableNames = new StringBuilder(); - for (SSTableReader reader : newSStables) - newSSTableNames.append(reader.descriptor.baseFilename()).append(","); - - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - long[] counts = ci.getMergedRowCounts(); - StringBuilder mergeSummary = new StringBuilder(counts.length * 10); - Map<Integer, Long> mergedRows = new HashMap<>(); - for (int i = 0; i < counts.length; i++) - { - long count = counts[i]; - if (count == 0) - continue; - - int rows = i + 1; - totalSourceRows += rows * count; - mergeSummary.append(String.format("%d:%d, ", rows, count)); - mergedRows.put(rows, count); - } - - SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); - logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString())); - logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten)); + Collection<SSTableReader> oldSStables = this.sstables; + if (!offline) + cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); + + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(oldSStables); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; + + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + long[] counts = ci.getMergedRowCounts(); + StringBuilder mergeSummary = new StringBuilder(counts.length * 10); + Map<Integer, Long> mergedRows = new HashMap<>(); + for (int i = 0; i < counts.length; i++) + { + long count = counts[i]; + if (count == 0) + continue; + + int rows = i + 1; + totalSourceRows += rows * count; + mergeSummary.append(String.format("%d:%d, ", rows, count)); + mergedRows.put(rows, count); } + + SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); + logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString())); + logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 5499195..dc065af 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -52,18 +52,15 @@ public class SSTableScanner implements ISSTableScanner protected Iterator<OnDiskAtomIterator> iterator; - // We can race with the sstable for deletion during compaction. If it's been ref counted to 0, skip public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) { - return sstable.acquireReference() - ? new SSTableScanner(sstable, dataRange, limiter) - : new SSTableScanner.EmptySSTableScanner(sstable.getFilename()); + return new SSTableScanner(sstable, dataRange, limiter); } public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) { // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges)); - if (positions.isEmpty() || !sstable.acquireReference()) + if (positions.isEmpty()) return new EmptySSTableScanner(sstable.getFilename()); return new SSTableScanner(sstable, tokenRanges, limiter); @@ -173,7 +170,6 @@ public class SSTableScanner implements ISSTableScanner public void close() throws IOException { FileUtils.close(dfile, ifile); - sstable.releaseReference(); } public long getLengthInBytes()