Repository: cassandra Updated Branches: refs/heads/trunk eff80ff09 -> d908bf431
Fix SSTableRewriter test on Windows Patch by stefania; reviewed by jmckenzie for CASSANDRA-8962 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d908bf43 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d908bf43 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d908bf43 Branch: refs/heads/trunk Commit: d908bf431af7abe68aa69e6fd0ab50e30f01e607 Parents: eff80ff Author: Stefania Alborghetti <[email protected]> Authored: Thu Apr 9 15:05:45 2015 -0500 Committer: Joshua McKenzie <[email protected]> Committed: Thu Apr 9 15:05:45 2015 -0500 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionTask.java | 6 + .../io/sstable/SSTableRewriterTest.java | 135 +++++++++++-------- 2 files changed, 84 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d908bf43/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 8133cc3..4233ca6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -244,6 +244,12 @@ public class CompactionTask extends AbstractCompactionTask taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, writer.estimatedKeys(), ((double)(totalKeysWritten - writer.estimatedKeys())/totalKeysWritten)); + + if (offline) + { + for (SSTableReader reader : newSStables) + reader.selfRef().release(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d908bf43/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 5fc2977..17f3392 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -22,9 +22,6 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; - -import com.google.common.base.Function; import com.google.common.collect.Sets; import org.junit.After; import org.junit.BeforeClass; @@ -40,6 +37,7 @@ import org.apache.cassandra.db.compaction.CompactionController; import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.locator.SimpleStrategy; @@ -49,10 +47,10 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; + +import static org.junit.Assert.*; public class SSTableRewriterTest extends SchemaLoader { @@ -75,9 +73,9 @@ public class SSTableRewriterTest extends SchemaLoader Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); store.truncateBlocking(); + SSTableDeletingTask.waitForDeletions(); } - @Test public void basicTest() throws InterruptedException { @@ -107,8 +105,9 @@ public class SSTableRewriterTest extends SchemaLoader } } Collection<SSTableReader> newsstables = writer.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION); - Thread.sleep(100); + cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); + SSTableDeletingTask.waitForDeletions(); + validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); assertEquals(1, filecounts); @@ -140,7 +139,8 @@ public class SSTableRewriterTest extends SchemaLoader } Collection<SSTableReader> newsstables = writer.finish(); cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); - Thread.sleep(100); + SSTableDeletingTask.waitForDeletions(); + validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); assertEquals(1, filecounts); @@ -194,12 +194,14 @@ public class SSTableRewriterTest extends SchemaLoader assertTrue(checked); Collection<SSTableReader> newsstables = writer.finish(); cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); - Thread.sleep(100); + SSTableDeletingTask.waitForDeletions(); + validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); assertEquals(1, filecounts); cfs.truncateBlocking(); - Thread.sleep(1000); // make sure the deletion tasks have run etc + SSTableDeletingTask.waitForDeletions(); + validateCFS(cfs); } @@ -229,10 +231,15 @@ public class SSTableRewriterTest extends SchemaLoader s.markObsolete(); s.selfRef().release(); s2.selfRef().release(); - Thread.sleep(1000); - assertFileCounts(dir.list(), 0, 2); + // These checks don't work on Windows because the writer has the channel still + // open till .abort() is called (via the builder) + if (!FBUtilities.isWindows()) + { + SSTableDeletingTask.waitForDeletions(); + assertFileCounts(dir.list(), 0, 2); + } writer.abort(); - Thread.sleep(1000); + SSTableDeletingTask.waitForDeletions(); int datafiles = assertFileCounts(dir.list(), 0, 0); assertEquals(datafiles, 0); validateCFS(cfs); @@ -286,7 +293,8 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.getCount()); assertEquals(files, sstables.size()); assertEquals(files, cfs.getSSTables().size()); - Thread.sleep(1000); + SSTableDeletingTask.waitForDeletions(); + // tmplink and tmp files should be gone: assertEquals(sum, cfs.metric.totalDiskSpaceUsed.getCount()); assertFileCounts(s.descriptor.directory.list(), 0, 0); @@ -322,23 +330,24 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } } - - List<SSTableReader> sstables = rewriter.finish(); - assertEquals(files, sstables.size()); - assertEquals(files, cfs.getSSTables().size()); - assertEquals(1, cfs.getDataTracker().getView().shadowed.size()); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - assertEquals(files, cfs.getSSTables().size()); - assertEquals(0, cfs.getDataTracker().getView().shadowed.size()); - Thread.sleep(1000); - assertFileCounts(s.descriptor.directory.list(), 0, 0); - validateCFS(cfs); } catch (Throwable t) { rewriter.abort(); throw t; } + + List<SSTableReader> sstables = rewriter.finish(); + assertEquals(files, sstables.size()); + assertEquals(files, cfs.getSSTables().size()); + assertEquals(1, cfs.getDataTracker().getView().shadowed.size()); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + assertEquals(files, cfs.getSSTables().size()); + assertEquals(0, cfs.getDataTracker().getView().shadowed.size()); + SSTableDeletingTask.waitForDeletions(); + + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); } @@ -449,7 +458,8 @@ public class SSTableRewriterTest extends SchemaLoader throw t; } - Thread.sleep(1000); + SSTableDeletingTask.waitForDeletions(); + assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(1, cfs.getSSTables().size()); assertFileCounts(s.descriptor.directory.list(), 0, 0); @@ -494,17 +504,18 @@ public class SSTableRewriterTest extends SchemaLoader break; } } - - Thread.sleep(1000); - assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file - assertFileCounts(s.descriptor.directory.list(), 0, 0); - validateCFS(cfs); } catch (Throwable t) { rewriter.abort(); throw t; } + + SSTableDeletingTask.waitForDeletions(); + + assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); } @Test @@ -536,20 +547,20 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } } - - List<SSTableReader> sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - Thread.sleep(1000); - assertFileCounts(s.descriptor.directory.list(), 0, 0); - cfs.truncateBlocking(); - Thread.sleep(1000); // make sure the deletion tasks have run etc - validateCFS(cfs); } catch (Throwable t) { rewriter.abort(); throw t; } + + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + SSTableDeletingTask.waitForDeletions(); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + cfs.truncateBlocking(); + SSTableDeletingTask.waitForDeletions(); + validateCFS(cfs); } @Test @@ -581,21 +592,23 @@ public class SSTableRewriterTest extends SchemaLoader files++; } } - - List<SSTableReader> sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - assertEquals(files, sstables.size()); - assertEquals(files, cfs.getSSTables().size()); - Thread.sleep(1000); - assertFileCounts(s.descriptor.directory.list(), 0, 0); - validateCFS(cfs); } catch (Throwable t) { rewriter.abort(); throw t; } + + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + assertEquals(files, sstables.size()); + assertEquals(files, cfs.getSSTables().size()); + SSTableDeletingTask.waitForDeletions(); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + + validateCFS(cfs); } + @Test public void testSSTableSplit() throws InterruptedException { @@ -607,12 +620,16 @@ public class SSTableRewriterTest extends SchemaLoader cfs.getDataTracker().markCompacting(Arrays.asList(s), true, false); SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10); splitter.split(); - Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + + s.selfRef().release(); + SSTableDeletingTask.waitForDeletions(); + for (File f : s.descriptor.directory.listFiles()) { // we need to clear out the data dir, otherwise tests running after this breaks - f.delete(); + FileUtils.deleteRecursive(f); } } @@ -676,8 +693,12 @@ public class SSTableRewriterTest extends SchemaLoader finally { cfs.getDataTracker().unmarkCompacting(compacting); + if (offline) + s.selfRef().release(); } - Thread.sleep(1000); + + SSTableDeletingTask.waitForDeletions(); + int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); assertEquals(filecount, 1); if (!offline) @@ -686,21 +707,21 @@ public class SSTableRewriterTest extends SchemaLoader validateCFS(cfs); } cfs.truncateBlocking(); - Thread.sleep(1000); + SSTableDeletingTask.waitForDeletions(); + filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); if (offline) { - // the file is not added to the CFS, therefor not truncated away above + // the file is not added to the CFS, therefore not truncated away above assertEquals(1, filecount); for (File f : s.descriptor.directory.listFiles()) { - f.delete(); + FileUtils.deleteRecursive(f); } filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); } assertEquals(0, filecount); - } @Test @@ -756,7 +777,7 @@ public class SSTableRewriterTest extends SchemaLoader } } validateKeys(keyspace); - Thread.sleep(1000); + SSTableDeletingTask.waitForDeletions(); validateCFS(cfs); }
