Author: slebresne Date: Wed Jun 15 17:55:31 2011 New Revision: 1136135 URL: http://svn.apache.org/viewvc?rev=1136135&view=rev Log: Avoids unmarking compacting sstable prematurely during cleanup patch by slebresne; reviewed by jbellis for CASSANDRA-2769
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1136135&r1=1136134&r2=1136135&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jun 15 17:55:31 2011 @@ -57,6 +57,7 @@ (CASSANDRA-2767) * use threadsafe collections for StreamInSession (CASSANDRA-2766) * avoid infinite loop when creating merkle tree (CASSANDRA-2758) + * avoids unmarking compacting sstable prematurely in cleanup (CASSANDRA-2769) 0.8.0-final Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136135&r1=1136134&r2=1136135&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Jun 15 17:55:31 2011 @@ -845,56 +845,50 @@ public class CompactionManager implement logger.debug("Expected bloom filter size : " + expectedBloomFilterSize); SSTableWriter writer = null; + + logger.info("Cleaning up " + sstable); + // Calculate the expected compacted filesize + long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2; + String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize); + if (compactionFileLocation == null) + throw new IOException("disk full"); + + SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE); + SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns(); + CleanupInfo ci = new CleanupInfo(sstable, scanner); + executor.beginCompaction(ci); try { - logger.info("Cleaning up " + sstable); - // Calculate the expected compacted filesize - long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2; - String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize); - if (compactionFileLocation == null) - throw new IOException("disk full"); - - SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE); - SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns(); - CleanupInfo ci = new CleanupInfo(sstable, scanner); - executor.beginCompaction(ci); - try + while (scanner.hasNext()) { - while (scanner.hasNext()) + SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); + if (Range.isTokenInRanges(row.getKey().token, ranges)) { - SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); - if (Range.isTokenInRanges(row.getKey().token, ranges)) - { - writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable)); - writer.append(controller.getCompactedRow(row)); - totalkeysWritten++; - } - else + writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable)); + writer.append(controller.getCompactedRow(row)); + totalkeysWritten++; + } + else + { + cfs.invalidateCachedRow(row.getKey()); + if (!indexedColumns.isEmpty() || isCommutative) { - cfs.invalidateCachedRow(row.getKey()); - if (!indexedColumns.isEmpty() || isCommutative) + while (row.hasNext()) { - while (row.hasNext()) - { - IColumn column = row.next(); - if (column instanceof CounterColumn) - renewer.maybeRenew((CounterColumn) column); - if (indexedColumns.contains(column.name())) - Table.cleanupIndexEntry(cfs, row.getKey().key, column); - } + IColumn column = row.next(); + if (column instanceof CounterColumn) + renewer.maybeRenew((CounterColumn) column); + if (indexedColumns.contains(column.name())) + Table.cleanupIndexEntry(cfs, row.getKey().key, column); } } } } - finally - { - scanner.close(); - executor.finishCompaction(ci); - } } finally { - cfs.getDataTracker().unmarkCompacting(Arrays.asList(sstable)); + scanner.close(); + executor.finishCompaction(ci); } List<SSTableReader> results = new ArrayList<SSTableReader>();