Repository: cassandra Updated Branches: refs/heads/trunk e338d2fa8 -> b09e60f72
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 60cac2b..860f1d1 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -100,7 +100,7 @@ public class LegacySSTableTest protected Descriptor getDescriptor(String ver) { File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator + KSNAME); - return new Descriptor(ver, directory, KSNAME, CFNAME, 0, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY); + return new Descriptor(ver, directory, KSNAME, CFNAME, 0, SSTableFormat.Type.LEGACY); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index 5a7c074..782f7fd 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -21,6 +21,8 @@ import java.io.File; import java.util.List; import com.google.common.io.Files; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -33,6 +35,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -40,11 +43,15 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class SSTableLoaderTest { public static final String KEYSPACE1 = "SSTableLoaderTest"; - public static final String CF_STANDARD = "Standard1"; + public static final String CF_STANDARD1 = "Standard1"; + public static final String CF_STANDARD2 = "Standard2"; + + private File tmpdir; @BeforeClass public static void defineSchema() throws Exception @@ -52,57 +59,66 @@ public class SSTableLoaderTest SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); - setup(); + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2)); + + StorageService.instance.initServer(); } - public static void setup() throws Exception + @Before + public void setup() throws Exception { - StorageService.instance.initServer(); + tmpdir = Files.createTempDir(); + } + + @After + public void cleanup() + { + FileUtils.deleteRecursive(tmpdir); + } + + private static final class TestClient extends SSTableLoader.Client + { + private String keyspace; + + public void init(String keyspace) + { + this.keyspace = keyspace; + for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1)) + addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + setPartitioner(StorageService.getPartitioner()); + } + + public CFMetaData getTableMetadata(String tableName) + { + return Schema.instance.getCFMetaData(keyspace, tableName); + } } @Test public void testLoadingSSTable() throws Exception { - File tempdir = Files.createTempDir(); - File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator + CF_STANDARD); + File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator + CF_STANDARD1); assert dataDir.mkdirs(); - CFMetaData cfmeta = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD); + CFMetaData cfmeta = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD1); String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)"; - ; + try (CQLSSTableWriter writer = CQLSSTableWriter.builder() .inDirectory(dataDir) .withPartitioner(StorageService.getPartitioner()) - .forTable(String.format(schema, KEYSPACE1, CF_STANDARD)) - .using(String.format(query, KEYSPACE1, CF_STANDARD)) + .forTable(String.format(schema, KEYSPACE1, CF_STANDARD1)) + .using(String.format(query, KEYSPACE1, CF_STANDARD1)) .build()) { writer.addRow("key1", "col1", "100"); } - SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() - { - private String keyspace; - - public void init(String keyspace) - { - this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1)) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); - setPartitioner(StorageService.getPartitioner()); - } - - public CFMetaData getTableMetadata(String tableName) - { - return Schema.instance.getCFMetaData(keyspace, tableName); - } - }, new OutputHandler.SystemOutput(false, false)); - + SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); loader.stream().get(); - List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD)).build()); + List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build()); assertEquals(1, partitions.size()); assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); @@ -110,4 +126,51 @@ public class SSTableLoaderTest .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val"))) .value()); } + + @Test + public void testLoadingIncompleteSSTable() throws Exception + { + File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator + CF_STANDARD2); + assert dataDir.mkdirs(); + + //make sure we have no tables... + assertTrue(dataDir.listFiles().length == 0); + + String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; + String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)"; + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .withPartitioner(StorageService.getPartitioner()) + .forTable(String.format(schema, KEYSPACE1, CF_STANDARD2)) + .using(String.format(query, KEYSPACE1, CF_STANDARD2)) + .withBufferSizeInMB(1) + .build(); + + for (int i = 0; i < 1000; i++) // make sure to write more than 1 MB + { + for (int j = 0; j < 100; j++) + writer.addRow(String.format("key%d", i), String.format("col%d", j), "100"); + } + + //make sure we have some tables... + assertTrue(dataDir.listFiles().length > 0); + + //writer is still open so loader should not load anything + SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); + loader.stream().get(); + + List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build()); + + assertTrue(partitions.size() > 0 && partitions.size() < 1000); + + // now we complete the write and the second loader should load the last sstable as well + writer.close(); + + loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); + loader.stream().get(); + + partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build()); + assertEquals(1000, partitions.size()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 579f981..0e533c2 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -44,6 +44,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.lifecycle.TransactionLogs; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionController; @@ -106,7 +107,7 @@ public class SSTableRewriterTest extends SchemaLoader Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); store.truncateBlocking(); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); } @Test @@ -135,16 +136,16 @@ public class SSTableRewriterTest extends SchemaLoader CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { - writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory, txn)); while(ci.hasNext()) { writer.append(ci.next()); } writer.finish(); } - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); validateCFS(cfs); - int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); + int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list()); assertEquals(1, filecounts); truncate(cfs); } @@ -167,16 +168,16 @@ public class SSTableRewriterTest extends SchemaLoader CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { - writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory, txn)); while (ci.hasNext()) { writer.append(ci.next()); } writer.finish(); } - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); validateCFS(cfs); - int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); + int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list()); assertEquals(1, filecounts); } @@ -200,7 +201,7 @@ public class SSTableRewriterTest extends SchemaLoader CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { - writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory, txn)); while (ci.hasNext()) { UnfilteredRowIterator row = ci.next(); @@ -229,9 +230,9 @@ public class SSTableRewriterTest extends SchemaLoader assertTrue(checked); writer.finish(); } - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); validateCFS(cfs); - int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); + int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list()); assertEquals(1, filecounts); truncate(cfs); } @@ -244,7 +245,8 @@ public class SSTableRewriterTest extends SchemaLoader truncate(cfs); File dir = cfs.directories.getDirectoryForNewSSTables(); - try (SSTableWriter writer = getWriter(cfs, dir)) + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata); + try (SSTableWriter writer = getWriter(cfs, dir, txn)) { for (int i = 0; i < 10000; i++) { @@ -256,7 +258,7 @@ public class SSTableRewriterTest extends SchemaLoader SSTableReader s = writer.setMaxDataAge(1000).openEarly(); assert s != null; - assertFileCounts(dir.list(), 2, 2); + assertFileCounts(dir.list()); for (int i = 10000; i < 20000; i++) { UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1); @@ -266,20 +268,20 @@ public class SSTableRewriterTest extends SchemaLoader } SSTableReader s2 = writer.setMaxDataAge(1000).openEarly(); assertTrue(s.last.compareTo(s2.last) < 0); - assertFileCounts(dir.list(), 2, 2); - s.markObsolete(cfs.getTracker()); + assertFileCounts(dir.list()); s.selfRef().release(); s2.selfRef().release(); // These checks don't work on Windows because the writer has the channel still // open till .abort() is called (via the builder) if (!FBUtilities.isWindows()) { - SSTableDeletingTask.waitForDeletions(); - assertFileCounts(dir.list(), 0, 2); + TransactionLogs.waitForDeletions(); + assertFileCounts(dir.list()); } writer.abort(); - SSTableDeletingTask.waitForDeletions(); - int datafiles = assertFileCounts(dir.list(), 0, 0); + txn.abort(); + TransactionLogs.waitForDeletions(); + int datafiles = assertFileCounts(dir.list()); assertEquals(datafiles, 0); validateCFS(cfs); } @@ -306,14 +308,14 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); while(ci.hasNext()) { rewriter.append(ci.next()); if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.getCount()); @@ -323,6 +325,9 @@ public class SSTableRewriterTest extends SchemaLoader } sstables = rewriter.finish(); } + + TransactionLogs.waitForDeletions(); + long sum = 0; for (SSTableReader x : cfs.getSSTables()) sum += x.bytesOnDisk(); @@ -330,11 +335,11 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, StorageMetrics.load.getCount()); assertEquals(files, sstables.size()); assertEquals(files, cfs.getSSTables().size()); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); // tmplink and tmp files should be gone: assertEquals(sum, cfs.metric.totalDiskSpaceUsed.getCount()); - assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); } @@ -358,14 +363,14 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); while(ci.hasNext()) { rewriter.append(ci.next()); if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } @@ -375,9 +380,9 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(files, sstables.size()); assertEquals(files, cfs.getSSTables().size()); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); - assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); } @@ -387,7 +392,12 @@ public class SSTableRewriterTest extends SchemaLoader { testNumberOfFiles_abort(new RewriterTest() { - public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter) + public void run(ISSTableScanner scanner, + CompactionController controller, + SSTableReader sstable, + ColumnFamilyStore cfs, + SSTableRewriter rewriter, + LifecycleTransaction txn) { try (CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { @@ -397,7 +407,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.append(ci.next()); if (rewriter.currentWriter().getFilePointer() > 25000000) { - rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory, txn)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } @@ -413,7 +423,12 @@ public class SSTableRewriterTest extends SchemaLoader { testNumberOfFiles_abort(new RewriterTest() { - public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter) + public void run(ISSTableScanner scanner, + CompactionController controller, + SSTableReader sstable, + ColumnFamilyStore cfs, + SSTableRewriter rewriter, + LifecycleTransaction txn) { try (CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { @@ -423,7 +438,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.append(ci.next()); if (rewriter.currentWriter().getFilePointer() > 25000000) { - rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory, txn)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } @@ -444,7 +459,12 @@ public class SSTableRewriterTest extends SchemaLoader { testNumberOfFiles_abort(new RewriterTest() { - public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter) + public void run(ISSTableScanner scanner, + CompactionController controller, + SSTableReader sstable, + ColumnFamilyStore cfs, + SSTableRewriter rewriter, + LifecycleTransaction txn) { try(CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { @@ -454,7 +474,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.append(ci.next()); if (files == 1 && rewriter.currentWriter().getFilePointer() > 10000000) { - rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory, txn)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } @@ -467,7 +487,12 @@ public class SSTableRewriterTest extends SchemaLoader private static interface RewriterTest { - public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter); + public void run(ISSTableScanner scanner, + CompactionController controller, + SSTableReader sstable, + ColumnFamilyStore cfs, + SSTableRewriter rewriter, + LifecycleTransaction txn); } private void testNumberOfFiles_abort(RewriterTest test) throws Exception @@ -483,21 +508,20 @@ public class SSTableRewriterTest extends SchemaLoader DecoratedKey origLast = s.last; long startSize = cfs.metric.liveDiskSpaceUsed.getCount(); Set<SSTableReader> compacting = Sets.newHashSet(s); - try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000)) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); - test.run(scanner, controller, s, cfs, rewriter); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); + test.run(scanner, controller, s, cfs, rewriter, txn); } - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(1, cfs.getSSTables().size()); - assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertFileCounts(s.descriptor.directory.list()); assertEquals(cfs.getSSTables().iterator().next().first, origFirst); assertEquals(cfs.getSSTables().iterator().next().last, origLast); validateCFS(cfs); @@ -522,13 +546,13 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); while(ci.hasNext()) { rewriter.append(ci.next()); if (rewriter.currentWriter().getFilePointer() > 2500000) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } @@ -541,10 +565,10 @@ public class SSTableRewriterTest extends SchemaLoader } } - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file - assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); } @@ -568,13 +592,13 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); while(ci.hasNext()) { rewriter.append(ci.next()); if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); files++; assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } @@ -583,8 +607,8 @@ public class SSTableRewriterTest extends SchemaLoader sstables = rewriter.finish(); } - SSTableDeletingTask.waitForDeletions(); - assertFileCounts(s.descriptor.directory.list(), 0, 0); + TransactionLogs.waitForDeletions(); + assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); } @@ -608,14 +632,14 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1000000); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); while(ci.hasNext()) { rewriter.append(ci.next()); if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000) { assertEquals(files, cfs.getSSTables().size()); // all files are now opened early - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); files++; } } @@ -624,8 +648,8 @@ public class SSTableRewriterTest extends SchemaLoader } assertEquals(files, sstables.size()); assertEquals(files, cfs.getSSTables().size()); - SSTableDeletingTask.waitForDeletions(); - assertFileCounts(s.descriptor.directory.list(), 0, 0); + TransactionLogs.waitForDeletions(); + assertFileCounts(s.descriptor.directory.list()); validateCFS(cfs); } @@ -643,10 +667,10 @@ public class SSTableRewriterTest extends SchemaLoader SSTableSplitter splitter = new SSTableSplitter(cfs, txn, 10); splitter.split(); - assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertFileCounts(s.descriptor.directory.list()); s.selfRef().release(); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); for (File f : s.descriptor.directory.listFiles()) { @@ -697,13 +721,13 @@ public class SSTableRewriterTest extends SchemaLoader CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()) ) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); while (ci.hasNext()) { rewriter.append(ci.next()); if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); } } try @@ -722,9 +746,9 @@ public class SSTableRewriterTest extends SchemaLoader s.selfRef().release(); } - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); - int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + int filecount = assertFileCounts(s.descriptor.directory.list()); assertEquals(filecount, 1); if (!offline) { @@ -737,7 +761,7 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(0, cfs.getSSTables().size()); cfs.truncateBlocking(); } - filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + filecount = assertFileCounts(s.descriptor.directory.list()); if (offline) { // the file is not added to the CFS, therefore not truncated away above @@ -746,7 +770,7 @@ public class SSTableRewriterTest extends SchemaLoader { FileUtils.deleteRecursive(f); } - filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); + filecount = assertFileCounts(s.descriptor.directory.list()); } assertEquals(0, filecount); @@ -787,13 +811,13 @@ public class SSTableRewriterTest extends SchemaLoader CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()) ) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); while (ci.hasNext()) { rewriter.append(ci.next()); if (keyCount % 10 == 0) { - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); } keyCount++; validateKeys(keyspace); @@ -801,7 +825,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.finish(); } validateKeys(keyspace); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); validateCFS(cfs); truncate(cfs); } @@ -825,7 +849,7 @@ public class SSTableRewriterTest extends SchemaLoader CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()) ) { - writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory, txn)); while (ci.hasNext()) { writer.append(ci.next()); @@ -870,8 +894,8 @@ public class SSTableRewriterTest extends SchemaLoader CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()) ) { - writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); - writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory, txn)); + writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory, txn)); while (ci.hasNext()) { if (writer.currentWriter().getFilePointer() < 15000000) @@ -886,7 +910,6 @@ public class SSTableRewriterTest extends SchemaLoader validateCFS(cfs); } - private void validateKeys(Keyspace ks) { for (int i = 0; i < 100; i++) @@ -900,8 +923,8 @@ public class SSTableRewriterTest extends SchemaLoader public static void truncate(ColumnFamilyStore cfs) { cfs.truncateBlocking(); - SSTableDeletingTask.waitForDeletions(); - Uninterruptibles.sleepUninterruptibly(10L,TimeUnit.MILLISECONDS); + TransactionLogs.waitForDeletions(); + Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS); assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); validateCFS(cfs); @@ -919,9 +942,9 @@ public class SSTableRewriterTest extends SchemaLoader for (int f = 0 ; f < fileCount ; f++) { File dir = cfs.directories.getDirectoryForNewSSTables(); - String filename = cfs.getTempSSTablePath(dir); + String filename = cfs.getSSTablePath(dir); - try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))) { int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount; for ( ; i < end ; i++) @@ -965,7 +988,7 @@ public class SSTableRewriterTest extends SchemaLoader assertTrue(cfs.getTracker().getCompacting().isEmpty()); } - public static int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) + public static int assertFileCounts(String [] files) { int tmplinkcount = 0; int tmpcount = 0; @@ -981,15 +1004,15 @@ public class SSTableRewriterTest extends SchemaLoader else if (f.contains("Data")) datacount++; } - assertEquals(expectedtmplinkCount, tmplinkcount); - assertEquals(expectedtmpCount, tmpcount); + assertEquals(0, tmplinkcount); + assertEquals(0, tmpcount); return datacount; } - public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory) + public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) { - String filename = cfs.getTempSSTablePath(directory); - return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)); + String filename = cfs.getSSTablePath(directory); + return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn); } public static ByteBuffer random(int i, int size) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 2c8377f..6de5bb9 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -28,10 +28,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.Util; import static org.junit.Assert.assertEquals; @@ -78,7 +75,7 @@ public class SSTableUtils File cfDir = new File(tempdir, keyspaceName + File.separator + cfname); cfDir.mkdirs(); cfDir.deleteOnExit(); - File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation, Descriptor.Type.FINAL).filenameFor("Data.db")); + File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation).filenameFor("Data.db")); if (!datafile.createNewFile()) throw new IOException("unable to create file " + datafile); datafile.deleteOnExit(); @@ -185,7 +182,7 @@ public class SSTableUtils return write(sorted.size(), new Appender() { @Override - public boolean append(SSTableWriter writer) throws IOException + public boolean append(SSTableTxnWriter writer) throws IOException { if (!iter.hasNext()) return false; @@ -208,7 +205,7 @@ public class SSTableUtils { File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA)); SerializationHeader header = SerializationHeader.make(Schema.instance.getCFMetaData(ksname, cfname), Collections.EMPTY_LIST); - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header); + SSTableTxnWriter writer = SSTableTxnWriter.create(datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header); while (appender.append(writer)) { /* pass */ } SSTableReader reader = writer.finish(true); // mark all components for removal @@ -222,6 +219,6 @@ public class SSTableUtils public static abstract class Appender { /** Called with an open writer until it returns false. */ - public abstract boolean append(SSTableWriter writer) throws IOException; + public abstract boolean append(SSTableTxnWriter writer) throws IOException; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index 7051bd3..c763932 100644 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@ -74,7 +74,7 @@ public class MetadataSerializerTest serializer.serialize(originalMetadata, out); } - Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL); + Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0); try (RandomAccessReader in = RandomAccessReader.open(statsFile)) { Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/schema/DefsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java index b567bb5..2bfd6ae 100644 --- a/test/unit/org/apache/cassandra/schema/DefsTest.java +++ b/test/unit/org/apache/cassandra/schema/DefsTest.java @@ -39,12 +39,12 @@ import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.lifecycle.TransactionLogs; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.locator.OldNetworkTopologyStrategy; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.ByteBufferUtil; @@ -536,7 +536,7 @@ public class DefsTest // check assertTrue(cfs.indexManager.getIndexes().isEmpty()); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); assertFalse(new File(desc.filenameFor(Component.DATA)).exists()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 6227a1f..875c306 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -87,7 +87,7 @@ public class StreamTransferTaskTest f.get(); // when timeout runs on second file, task should be completed - f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS); + f = task.scheduleTimeout(1, 10, TimeUnit.MILLISECONDS); task.complete(1); try { @@ -97,6 +97,7 @@ public class StreamTransferTaskTest catch (CancellationException ex) { } + assertEquals(StreamSession.State.WAIT_COMPLETE, session.state()); // when all streaming are done, time out task should not be scheduled. http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java index 4e160c2..f0c850d 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java @@ -87,9 +87,29 @@ public abstract class AbstractTransactionalTest txn = newTest(); Throwable t = new RuntimeException(); txn.testing.prepareToCommit(); - Assert.assertEquals(t, txn.testing.commit(t)); - Assert.assertEquals(t, txn.testing.abort(t)); - Assert.assertTrue(t.getSuppressed()[0] instanceof IllegalStateException); + + if (txn.commitCanThrow()) + { + try + { + txn.testing.commit(t); + } + catch (Throwable tt) + { + Assert.assertEquals(t, tt); + } + + Assert.assertEquals(t, txn.testing.abort(t)); + Assert.assertEquals(0, t.getSuppressed().length); + } + else + { + Assert.assertEquals(t, txn.testing.commit(t)); + Assert.assertEquals(t, txn.testing.abort(t)); + Assert.assertTrue(t.getSuppressed()[0] instanceof IllegalStateException); + } + + } @Test @@ -132,5 +152,10 @@ public abstract class AbstractTransactionalTest protected abstract void assertPrepared() throws Exception; protected abstract void assertAborted() throws Exception; protected abstract void assertCommitted() throws Exception; + + protected boolean commitCanThrow() + { + return false; + } } }
