http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java index fcec40d..b49055d 100644 --- a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java +++ b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java @@ -30,8 +30,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.service.StorageService; public class CQLSSTableWriterLongTest @@ -39,8 +37,7 @@ public class CQLSSTableWriterLongTest @BeforeClass public static void setup() throws Exception { - SchemaLoader.cleanupAndLeaveDirs(); - Keyspace.setInitialized(); + SchemaLoader.prepareServer(); StorageService.instance.initServer(); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/MockSchema.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java index 104e88f..d9c7e8b 100644 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -60,7 +60,7 @@ public class MockSchema private static final AtomicInteger id = new AtomicInteger(); public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1))); - private static final IndexSummary indexSummary; + public static final IndexSummary indexSummary; private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0); public static Memtable memtable(ColumnFamilyStore cfs) @@ -88,8 +88,7 @@ public class MockSchema Descriptor descriptor = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), - generation, - Descriptor.Type.FINAL); + generation); Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); for (Component component : components) { @@ -132,8 +131,13 @@ public class MockSchema public static ColumnFamilyStore newCFS() { + return newCFS(ks.getName()); + } + + public static ColumnFamilyStore newCFS(String ksname) + { String cfname = "mockcf" + (id.incrementAndGet()); - CFMetaData metadata = newCFMetaData(ks.getName(), cfname); + CFMetaData metadata = newCFMetaData(ksname, cfname); return new ColumnFamilyStore(ks, cfname, Murmur3Partitioner.instance, 0, metadata, new Directories(metadata), false, false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 654b8c6..c828de9 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -53,12 +53,9 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.FBUtilities; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 06b5ceb..ddc41c7 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -282,7 +282,7 @@ public abstract class CQLTester schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typesToDrop.get(i))); // Dropping doesn't delete the sstables. It's not a huge deal but it's cleaner to cleanup after us - // Thas said, we shouldn't delete blindly before the SSTableDeletingTask for the table we drop + // Thas said, we shouldn't delete blindly before the TransactionLogs.SSTableTidier for the table we drop // have run or they will be unhappy. Since those taks are scheduled on StorageService.tasks and that's // mono-threaded, just push a task on the queue to find when it's empty. No perfect but good enough. http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 9da4876..2a15bdf 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -49,7 +49,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; - +import static junit.framework.Assert.assertNotNull; @RunWith(OrderedJUnit4ClassRunner.class) public class ColumnFamilyStoreTest { @@ -349,8 +349,8 @@ public class ColumnFamilyStoreTest for (int version = 1; version <= 2; ++version) { - Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version, Descriptor.Type.FINAL); - Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version, Descriptor.Type.FINAL); + Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version); + Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version); for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS }) assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists()); } @@ -389,126 +389,10 @@ public class ColumnFamilyStoreTest // assertEquals(ByteBufferUtil.bytes("B"), cfSliced.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b"))).value()); // } - // TODO: fix once SSTableSimpleWriter's back in -// @Test -// public void testRemoveUnfinishedCompactionLeftovers() throws Throwable -// { -// String ks = KEYSPACE1; -// String cf = CF_STANDARD3; // should be empty -// -// final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); -// Directories dir = new Directories(cfmeta); -// ByteBuffer key = bytes("key"); -// -// // 1st sstable -// SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner()); -// writer.newRow(key); -// writer.addColumn(bytes("col"), bytes("val"), 1); -// writer.close(); -// -// Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list(); -// assertEquals(1, sstables.size()); -// -// Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next(); -// final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey()); -// -// // simulate incomplete compaction -// writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), -// cfmeta, StorageService.getPartitioner()) -// { -// protected SSTableWriter getWriter() -// { -// MetadataCollector collector = new MetadataCollector(cfmeta.comparator); -// collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable -// return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, DatabaseDescriptor.getSSTableFormat()), -// 0L, -// ActiveRepairService.UNREPAIRED_SSTABLE, -// metadata, -// DatabaseDescriptor.getPartitioner(), -// collector); -// } -// }; -// writer.newRow(key); -// writer.addColumn(bytes("col"), bytes("val"), 1); -// writer.close(); -// -// // should have 2 sstables now -// sstables = dir.sstableLister().list(); -// assertEquals(2, sstables.size()); -// -// SSTableReader sstable2 = SSTableReader.open(sstable1.descriptor); -// UUID compactionTaskID = SystemKeyspace.startCompaction( -// Keyspace.open(ks).getColumnFamilyStore(cf), -// Collections.singleton(sstable2)); -// -// Map<Integer, UUID> unfinishedCompaction = new HashMap<>(); -// unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID); -// ColumnFamilyStore.removeUnfinishedCompactionLeftovers(cfmeta, unfinishedCompaction); -// -// // 2nd sstable should be removed (only 1st sstable exists in set of size 1) -// sstables = dir.sstableLister().list(); -// assertEquals(1, sstables.size()); -// assertTrue(sstables.containsKey(sstable1.descriptor)); -// -// Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions(); -// assertTrue(unfinished.isEmpty()); -// sstable1.selfRef().release(); -// sstable2.selfRef().release(); -// } // TODO: Fix once SSTableSimpleWriter's back in // @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a> -// @Test -// public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable -// { -// final String ks = KEYSPACE1; -// final String cf = CF_STANDARD4; // should be empty -// -// final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); -// Directories dir = new Directories(cfmeta); -// ByteBuffer key = bytes("key"); -// -// // Write SSTable generation 3 that has ancestors 1 and 2 -// final Set<Integer> ancestors = Sets.newHashSet(1, 2); -// SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), -// cfmeta, StorageService.getPartitioner()) -// { -// protected SSTableWriter getWriter() -// { -// MetadataCollector collector = new MetadataCollector(cfmeta.comparator); -// for (int ancestor : ancestors) -// collector.addAncestor(ancestor); -// String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA); -// return SSTableWriter.create(Descriptor.fromFilename(file), -// 0L, -// ActiveRepairService.UNREPAIRED_SSTABLE, -// metadata, -// StorageService.getPartitioner(), -// collector); -// } -// }; -// writer.newRow(key); -// writer.addColumn(bytes("col"), bytes("val"), 1); -// writer.close(); -// -// Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list(); -// assert sstables.size() == 1; -// -// Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next(); -// final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey()); -// -// // simulate we don't have generation in compaction_history -// Map<Integer, UUID> unfinishedCompactions = new HashMap<>(); -// UUID compactionTaskID = UUID.randomUUID(); -// for (Integer ancestor : ancestors) -// unfinishedCompactions.put(ancestor, compactionTaskID); -// ColumnFamilyStore.removeUnfinishedCompactionLeftovers(cfmeta, unfinishedCompactions); -// -// // SSTable should not be deleted -// sstables = dir.sstableLister().list(); -// assert sstables.size() == 1; -// assert sstables.containsKey(sstable1.descriptor); -// } + // TODO: Fix once SSTableSimpleWriter's back in // @Test @@ -625,4 +509,30 @@ public class ColumnFamilyStoreTest } assertEquals(count, found); } + + @Test + public void testScrubDataDirectories() throws Throwable + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); + + ColumnFamilyStore.scrubDataDirectories(cfs.metadata); + + new RowUpdateBuilder(cfs.metadata, 2, "key").clustering("name").add("val", "2").build().applyUnsafe(); + cfs.forceBlockingFlush(); + + // Nuke the metadata and reload that sstable + Collection<SSTableReader> ssTables = cfs.getSSTables(); + assertEquals(1, ssTables.size()); + SSTableReader ssTable = ssTables.iterator().next(); + + String dataFileName = ssTable.descriptor.filenameFor(Component.DATA); + String tmpDataFileName = ssTable.descriptor.tmpFilenameFor(Component.DATA); + new File(dataFileName).renameTo(new File(tmpDataFileName)); + + ColumnFamilyStore.scrubDataDirectories(cfs.metadata); + + List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister().listFiles(); + assertNotNull(ssTableFiles); + assertEquals(0, ssTableFiles.size()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/DirectoriesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java index 6f3ccc9..14db2d1 100644 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@ -95,22 +95,22 @@ public class DirectoriesTest File dir = cfDir(cfm); dir.mkdirs(); - createFakeSSTable(dir, cfm.cfName, 1, false, fs); - createFakeSSTable(dir, cfm.cfName, 2, true, fs); + createFakeSSTable(dir, cfm.cfName, 1, fs); + createFakeSSTable(dir, cfm.cfName, 2, fs); File backupDir = new File(dir, Directories.BACKUPS_SUBDIR); backupDir.mkdir(); - createFakeSSTable(backupDir, cfm.cfName, 1, false, fs); + createFakeSSTable(backupDir, cfm.cfName, 1, fs); File snapshotDir = new File(dir, Directories.SNAPSHOT_SUBDIR + File.separator + "42"); snapshotDir.mkdirs(); - createFakeSSTable(snapshotDir, cfm.cfName, 1, false, fs); + createFakeSSTable(snapshotDir, cfm.cfName, 1, fs); } } - private static void createFakeSSTable(File dir, String cf, int gen, boolean temp, List<File> addTo) throws IOException + private static void createFakeSSTable(File dir, String cf, int gen, List<File> addTo) throws IOException { - Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? Descriptor.Type.TEMP : Descriptor.Type.FINAL); + Descriptor desc = new Descriptor(dir, KS, cf, gen); for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER }) { File f = new File(desc.filenameFor(c)); @@ -145,7 +145,7 @@ public class DirectoriesTest Directories directories = new Directories(cfm); assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables()); - Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL); + Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1); File snapshotDir = new File(cfDir(cfm), File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42"); assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42")); @@ -173,8 +173,8 @@ public class DirectoriesTest { assertEquals(cfDir(INDEX_CFM), dir); } - Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0, Descriptor.Type.FINAL); - Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0, Descriptor.Type.FINAL); + Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0); + Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0); // snapshot dir should be created under its parent's File parentSnapshotDirectory = Directories.getSnapshotDirectory(parentDesc, "test"); @@ -191,9 +191,9 @@ public class DirectoriesTest indexDirectories.snapshotCreationTime("test")); // check true snapshot size - Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0, Descriptor.Type.FINAL); + Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0); createFile(parentSnapshot.filenameFor(Component.DATA), 30); - Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0, Descriptor.Type.FINAL); + Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0); createFile(indexSnapshot.filenameFor(Component.DATA), 40); assertEquals(30, parentDirectories.trueSnapshotsSize()); @@ -311,7 +311,7 @@ public class DirectoriesTest final String n = Long.toString(System.nanoTime()); Callable<File> directoryGetter = new Callable<File>() { public File call() throws Exception { - Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL); + Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1); return Directories.getSnapshotDirectory(desc, n); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 47bfa0c..181b4e0 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -65,6 +65,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.OutputHandler; import static org.junit.Assert.*; import static org.junit.Assume.assumeTrue; @@ -315,7 +316,7 @@ public class ScrubTest * Code used to generate an outOfOrder sstable. The test for out-of-order key in BigTableWriter must also be commented out. * The test also assumes an ordered partitioner. List<String> keys = Arrays.asList("t", "a", "b", "z", "c", "y", "d"); - String filename = cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))); + SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))), SSTableWriter writer = SSTableWriter.create(cfs.metadata, Descriptor.fromFilename(filename), keys.size(), @@ -341,7 +342,7 @@ public class ScrubTest File rootDir = new File(root); assert rootDir.isDirectory(); - Descriptor desc = new Descriptor("la", rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL, SSTableFormat.Type.BIG); + Descriptor desc = new Descriptor("la", rootDir, KEYSPACE, columnFamily, 1, SSTableFormat.Type.BIG); CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname); try @@ -366,7 +367,7 @@ public class ScrubTest sstable.last = sstable.first; try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable); - Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);) + Scrubber scrubber = new Scrubber(cfs, txn, false, new OutputHandler.LogOutput(), true, true, true)) { scrubber.scrub(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java index 9ea7f93..d58985c 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java @@ -17,11 +17,15 @@ */ package org.apache.cassandra.db; +import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; +import org.apache.commons.io.FileUtils; import org.junit.BeforeClass; import org.junit.Test; @@ -30,6 +34,7 @@ import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.CassandraVersion; @@ -39,6 +44,8 @@ import static org.junit.Assert.assertTrue; public class SystemKeyspaceTest { + public static final String MIGRATION_SSTABLES_ROOT = "migration-sstable-root"; + @BeforeClass public static void prepSnapshotTracker() { @@ -145,6 +152,47 @@ public class SystemKeyspaceTest Keyspace.clearSnapshot(null, SystemKeyspace.NAME); } + @Test + public void testMigrateDataDirs() throws IOException + { + Path migrationSSTableRoot = Paths.get(System.getProperty(MIGRATION_SSTABLES_ROOT), "2.2"); + Path dataDir = Paths.get(DatabaseDescriptor.getAllDataFileLocations()[0]); + + FileUtils.copyDirectory(migrationSSTableRoot.toFile(), dataDir.toFile()); + + assertEquals(5, numLegacyFiles()); // see test data + + SystemKeyspace.migrateDataDirs(); + + assertEquals(0, numLegacyFiles()); + } + + private static int numLegacyFiles() + { + int ret = 0; + Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()); + for (String dataDir : dirs) + { + File dir = new File(dataDir); + for (File ksdir : dir.listFiles((d, n) -> d.isDirectory())) + { + for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory())) + { + if (Descriptor.isLegacyFile(cfdir.getName())) + { + ret++; + } + else + { + File[] legacyFiles = cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(n)); + ret += legacyFiles.length; + } + } + } + } + return ret; + } + private String getOlderVersionString() { String version = FBUtilities.getReleaseVersionString(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index b18e67b..9de33db 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -156,9 +156,9 @@ public class AntiCompactionTest private SSTableReader writeFile(ColumnFamilyStore cfs, int count) { File dir = cfs.directories.getDirectoryForNewSSTables(); - String filename = cfs.getTempSSTablePath(dir); + String filename = cfs.getSSTablePath(dir); - try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS))) { for (int i = 0; i < count; i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java index 0bcd603..30f14fc 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java @@ -78,7 +78,7 @@ public class CompactionAwareWriterTest extends CQLTester populate(rowCount); LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); - CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false, OperationType.COMPACTION); + CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false); int rows = compact(cfs, txn, writer); assertEquals(1, cfs.getSSTables().size()); assertEquals(rowCount, rows); @@ -97,7 +97,7 @@ public class CompactionAwareWriterTest extends CQLTester LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/10; - CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false, OperationType.COMPACTION); + CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false); int rows = compact(cfs, txn, writer); assertEquals(10, cfs.getSSTables().size()); assertEquals(rowCount, rows); @@ -114,7 +114,7 @@ public class CompactionAwareWriterTest extends CQLTester populate(rowCount); LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); - CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), OperationType.COMPACTION, 0); + CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), 0); int rows = compact(cfs, txn, writer); long expectedSize = beforeSize / 2; List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getSSTables()); @@ -150,7 +150,7 @@ public class CompactionAwareWriterTest extends CQLTester LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/targetSSTableCount; - CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false, OperationType.COMPACTION); + CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false); int rows = compact(cfs, txn, writer); assertEquals(targetSSTableCount, cfs.getSSTables().size()); int [] levelCounts = new int[5]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 2fa8488..7beb405 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -21,6 +21,11 @@ package org.apache.cassandra.db.compaction; import java.util.*; import java.util.concurrent.TimeUnit; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@ -34,11 +39,6 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; - import static org.junit.Assert.*; @RunWith(OrderedJUnit4ClassRunner.class) @@ -394,21 +394,25 @@ public class CompactionsTest cf.addColumn(Util.column("a", "a", 3)); cf.deletionInfo().add(new RangeTombstone(Util.cellname("0"), Util.cellname("b"), 2, (int) (System.currentTimeMillis()/1000)),cfmeta.comparator); - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0); - - - writer.append(Util.dk("0"), cf); - writer.append(Util.dk("1"), cf); - writer.append(Util.dk("3"), cf); + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(dir.getDirectoryForNewSSTables())); + try(SSTableTxnWriter writer = SSTableTxnWriter.create(desc, 0, 0, 0)) + { + writer.append(Util.dk("0"), cf); + writer.append(Util.dk("1"), cf); + writer.append(Util.dk("3"), cf); - cfs.addSSTable(writer.closeAndOpenReader()); - writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0); + cfs.addSSTable(writer.closeAndOpenReader()); + } - writer.append(Util.dk("0"), cf); - writer.append(Util.dk("1"), cf); - writer.append(Util.dk("2"), cf); - writer.append(Util.dk("3"), cf); - cfs.addSSTable(writer.closeAndOpenReader()); + desc = Descriptor.fromFilename(cfs.getSSTablePath(dir.getDirectoryForNewSSTables())); + try (SSTableTxnWriter writer = SSTableTxnWriter.create(desc, 0, 0, 0)) + { + writer.append(Util.dk("0"), cf); + writer.append(Util.dk("1"), cf); + writer.append(Util.dk("2"), cf); + writer.append(Util.dk("3"), cf); + cfs.addSSTable(writer.closeAndOpenReader()); + } Collection<SSTableReader> toCompact = cfs.getSSTables(); assert toCompact.size() == 2; @@ -439,35 +443,6 @@ public class CompactionsTest assertEquals(keys, k); } - @Test - public void testCompactionLog() throws Exception - { - SystemKeyspace.discardCompactionsInProgress(); - - String cf = "Standard4"; - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf); - SchemaLoader.insertData(KEYSPACE1, cf, 0, 1); - cfs.forceBlockingFlush(); - - Collection<SSTableReader> sstables = cfs.getSSTables(); - assertFalse(sstables.isEmpty()); - Set<Integer> generations = Sets.newHashSet(Iterables.transform(sstables, new Function<SSTableReader, Integer>() - { - public Integer apply(SSTableReader sstable) - { - return sstable.descriptor.generation; - } - })); - UUID taskId = SystemKeyspace.startCompaction(cfs, sstables); - Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions(); - Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet(); - assertTrue(unfinishedCompactions.containsAll(generations)); - - SystemKeyspace.finishCompaction(taskId); - compactionLogs = SystemKeyspace.getUnfinishedCompactions(); - assertFalse(compactionLogs.containsKey(Pair.create(KEYSPACE1, cf))); - } - private void testDontPurgeAccidentaly(String k, String cfname) throws InterruptedException { // This test catches the regression of CASSANDRA-2786 http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index ab84555..a869a7f 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -190,6 +190,9 @@ public class LeveledCompactionStrategyTest // L0 is the lowest priority, so when that's done, we know everything is done while (strategy.getSSTableCountPerLevel()[0] > 1) Thread.sleep(100); + + // in AbstractCompationStrategy.replaceSSTables() first we remove and then we add sstables so wait a little bit longer + Thread.sleep(10); } @Test @@ -211,7 +214,7 @@ public class LeveledCompactionStrategyTest } waitForLeveling(cfs); - LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1); + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1); assert strategy.getLevelSize(1) > 0; // get LeveledScanner for level 1 sstables http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java index 18bce10..e9c903e 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java @@ -18,6 +18,8 @@ */ package org.apache.cassandra.db.lifecycle; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; @@ -32,11 +34,16 @@ import junit.framework.Assert; import org.apache.cassandra.MockSchema; import org.apache.cassandra.Util; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.big.BigTableReader; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; + public class HelpersTest { @@ -150,19 +157,29 @@ public class HelpersTest for (SSTableReader reader : readers) Assert.assertTrue(reader.isReplaced()); accumulate = Helpers.setReplaced(readers, null); - Assert.assertNotNull(accumulate); + assertNotNull(accumulate); } @Test public void testMarkObsolete() { ColumnFamilyStore cfs = MockSchema.newCFS(); + TransactionLogs txnLogs = new TransactionLogs(OperationType.UNKNOWN, cfs.metadata); Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs)); - Throwable accumulate = Helpers.markObsolete(null, readers, null); + + List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>(); + Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null)); + assertNotNull(obsoletions); + assertEquals(2, obsoletions.size()); + + Throwable accumulate = Helpers.markObsolete(obsoletions, null); Assert.assertNull(accumulate); for (SSTableReader reader : readers) Assert.assertTrue(reader.isMarkedCompacted()); - accumulate = Helpers.markObsolete(null, readers, null); - Assert.assertNotNull(accumulate); + + accumulate = Helpers.markObsolete(obsoletions, null); + assertNotNull(accumulate); + + txnLogs.finish(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java index d6af447..a876891 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java @@ -33,7 +33,6 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState; import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action; -import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; @@ -249,7 +248,7 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest protected TestableTransaction newTest() { - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); SSTableReader.resetTidying(); return new TxnTest(); } @@ -399,6 +398,12 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest for (SSTableReader reader : concat(loggedObsolete, stagedObsolete)) Assert.assertTrue(reader.selfRef().globalCount() == 0); } + + @Override + protected boolean commitCanThrow() + { + return true; + } } private static SSTableReader[] readersArray(int lb, int ub, ColumnFamilyStore cfs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java new file mode 100644 index 0000000..5291baa --- /dev/null +++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.CompactionIterator; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableRewriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests to simulate real transactions such as compactions and flushing + * using SSTableRewriter, ColumnFamilyStore, LifecycleTransaction, TransactionLogs, etc + */ +public class RealTransactionsTest extends SchemaLoader +{ + private static final String KEYSPACE = "TransactionLogsTest"; + private static final String REWRITE_FINISHED_CF = "RewriteFinished"; + private static final String REWRITE_ABORTED_CF = "RewriteAborted"; + private static final String FLUSH_CF = "Flush"; + + @BeforeClass + public static void setUp() + { + MockSchema.cleanup(); + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, REWRITE_FINISHED_CF), + SchemaLoader.standardCFMD(KEYSPACE, REWRITE_ABORTED_CF), + SchemaLoader.standardCFMD(KEYSPACE, FLUSH_CF)); + } + + @Test + public void testRewriteFinished() throws IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(REWRITE_FINISHED_CF); + + SSTableReader oldSSTable = getSSTable(cfs, 1); + LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION); + SSTableReader newSSTable = replaceSSTable(cfs, txn, false); + TransactionLogs.waitForDeletions(); + + assertFiles(txn.logs().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths())); + assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet()); + } + + @Test + public void testRewriteAborted() throws IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(REWRITE_ABORTED_CF); + + SSTableReader oldSSTable = getSSTable(cfs, 1); + LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION); + + replaceSSTable(cfs, txn, true); + TransactionLogs.waitForDeletions(); + + assertFiles(txn.logs().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths())); + assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet()); + } + + @Test + public void testFlush() throws IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(FLUSH_CF); + + SSTableReader ssTableReader = getSSTable(cfs, 100); + + String dataFolder = cfs.getSSTables().iterator().next().descriptor.directory.getPath(); + String transactionLogsFolder = StringUtils.join(dataFolder, File.separator, Directories.TRANSACTIONS_SUBDIR); + + assertTrue(new File(transactionLogsFolder).exists()); + assertFiles(transactionLogsFolder, Collections.<String>emptySet()); + + assertFiles(dataFolder, new HashSet<>(ssTableReader.getAllFilePaths())); + } + + private SSTableReader getSSTable(ColumnFamilyStore cfs, int numPartitions) throws IOException + { + createSSTable(cfs, numPartitions); + + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + assertEquals(1, sstables.size()); + return sstables.iterator().next(); + } + + private void createSSTable(ColumnFamilyStore cfs, int numPartitions) throws IOException + { + cfs.truncateBlocking(); + + String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; + String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)"; + + try (CQLSSTableWriter writer = CQLSSTableWriter.builder() + .withPartitioner(StorageService.getPartitioner()) + .inDirectory(cfs.directories.getDirectoryForNewSSTables()) + .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name)) + .using(String.format(query, cfs.keyspace.getName(), cfs.name)) + .build()) + { + for (int j = 0; j < numPartitions; j ++) + writer.addRow(String.format("key%d", j), "col1", "0"); + } + + cfs.loadNewSSTables(); + } + + private SSTableReader replaceSSTable(ColumnFamilyStore cfs, LifecycleTransaction txn, boolean fail) + { + List<SSTableReader> newsstables = null; + int nowInSec = FBUtilities.nowInSeconds(); + try (CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(FBUtilities.nowInSeconds()))) + { + try (SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false); + AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(txn.originals()); + CompactionIterator ci = new CompactionIterator(txn.opType(), scanners.scanners, controller, nowInSec, txn.opId()) + ) + { + long lastCheckObsoletion = System.nanoTime(); + File directory = txn.originals().iterator().next().descriptor.directory; + Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(directory)); + CFMetaData metadata = Schema.instance.getCFMetaData(desc); + rewriter.switchWriter(SSTableWriter.create(metadata, + desc, + 0, + 0, + 0, + DatabaseDescriptor.getPartitioner(), + SerializationHeader.make(cfs.metadata, txn.originals()), + txn)); + while (ci.hasNext()) + { + rewriter.append(ci.next()); + + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) + { + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); + } + } + + if (!fail) + newsstables = rewriter.finish(); + else + rewriter.abort(); + } + } + + assertTrue(fail || newsstables != null); + + if (newsstables != null) + { + Assert.assertEquals(1, newsstables.size()); + return newsstables.iterator().next(); + } + + return null; + } + + private void assertFiles(String dirPath, Set<String> expectedFiles) + { + File dir = new File(dirPath); + for (File file : dir.listFiles()) + { + if (file.isDirectory()) + continue; + + String filePath = file.getPath(); + assertTrue(filePath, expectedFiles.contains(filePath)); + expectedFiles.remove(filePath); + } + + assertTrue(expectedFiles.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index dbd5287..89924a5 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -40,7 +40,6 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.notifications.*; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -152,6 +151,9 @@ public class TrackerTest Assert.assertEquals(3, tracker.view.get().sstables.size()); + for (SSTableReader reader : readers) + Assert.assertTrue(reader.isKeyCacheSetup()); + Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount()); } @@ -171,6 +173,9 @@ public class TrackerTest Assert.assertEquals(3, tracker.view.get().sstables.size()); + for (SSTableReader reader : readers) + Assert.assertTrue(reader.isKeyCacheSetup()); + Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount()); Assert.assertEquals(3, listener.senders.size()); Assert.assertEquals(tracker, listener.senders.get(0)); @@ -182,9 +187,9 @@ public class TrackerTest public void testDropSSTables() { testDropSSTables(false); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); testDropSSTables(true); - SSTableDeletingTask.waitForDeletions(); + TransactionLogs.waitForDeletions(); } private void testDropSSTables(boolean invalidate) @@ -200,7 +205,7 @@ public class TrackerTest try { - SSTableDeletingTask.pauseDeletions(true); + TransactionLogs.pauseDeletions(true); try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION)) { if (invalidate) @@ -222,19 +227,19 @@ public class TrackerTest Assert.assertEquals(0, reader.selfRef().globalCount()); Assert.assertTrue(reader.isMarkedCompacted()); } - Assert.assertNull(tracker.dropSSTables(new Predicate<SSTableReader>() { - public boolean apply(SSTableReader reader) - { - return reader != readers.get(0); - } - }, - OperationType.UNKNOWN, - null)); + + Assert.assertNull(tracker.dropSSTables(reader -> reader != readers.get(0), OperationType.UNKNOWN, null)); + Assert.assertEquals(1, tracker.getView().sstables.size()); Assert.assertEquals(3, listener.received.size()); Assert.assertEquals(tracker, listener.senders.get(0)); - Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(0)).removed.size()); - Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(0)).added.size()); + Assert.assertTrue(listener.received.get(0) instanceof SSTableDeletingNotification); + Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification); + Assert.assertTrue(listener.received.get(2) instanceof SSTableListChangedNotification); + Assert.assertEquals(readers.get(1), ((SSTableDeletingNotification) listener.received.get(0)).deleting); + Assert.assertEquals(readers.get(2), ((SSTableDeletingNotification)listener.received.get(1)).deleting); + Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(2)).removed.size()); + Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(2)).added.size()); Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount()); readers.get(0).selfRef().release(); } @@ -248,7 +253,7 @@ public class TrackerTest } finally { - SSTableDeletingTask.pauseDeletions(false); + TransactionLogs.pauseDeletions(false); } } @@ -299,6 +304,7 @@ public class TrackerTest Assert.assertEquals(1, listener.received.size()); Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added); listener.received.clear(); + Assert.assertTrue(reader.isKeyCacheSetup()); Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount()); // test invalidated CFS @@ -314,8 +320,10 @@ public class TrackerTest Assert.assertEquals(0, tracker.getView().sstables.size()); Assert.assertEquals(0, tracker.getView().flushingMemtables.size()); Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + Assert.assertEquals(3, listener.received.size()); Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added); - Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size()); + Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification); + Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size()); DatabaseDescriptor.setIncrementalBackupsEnabled(backups); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java new file mode 100644 index 0000000..3150087 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import org.junit.BeforeClass; +import org.junit.Test; + +import static junit.framework.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import junit.framework.Assert; +import org.apache.cassandra.MockSchema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.compaction.*; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.BufferedSegmentedFile; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.utils.AlwaysPresentFilter; +import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; +import org.apache.cassandra.utils.concurrent.Transactional; + +public class TransactionLogsTest extends AbstractTransactionalTest +{ + private static final String KEYSPACE = "TransactionLogsTest"; + + @BeforeClass + public static void setUp() + { + MockSchema.cleanup(); + } + + protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception + { + TransactionLogs.waitForDeletions(); + SSTableReader.resetTidying(); + return new TxnTest(); + } + + private static final class TxnTest extends TestableTransaction + { + private final static class Transaction extends Transactional.AbstractTransactional implements Transactional + { + final ColumnFamilyStore cfs; + final TransactionLogs txnLogs; + final SSTableReader sstableOld; + final SSTableReader sstableNew; + final TransactionLogs.SSTableTidier tidier; + + public Transaction(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException + { + this.cfs = cfs; + this.txnLogs = txnLogs; + this.sstableOld = sstable(cfs, 0, 128); + this.sstableNew = sstable(cfs, 1, 128); + + assertNotNull(txnLogs); + assertNotNull(txnLogs.getId()); + Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType()); + + txnLogs.trackNew(sstableNew); + tidier = txnLogs.obsoleted(sstableOld); + assertNotNull(tidier); + } + + protected Throwable doCommit(Throwable accumulate) + { + sstableOld.markObsolete(tidier); + sstableOld.selfRef().release(); + TransactionLogs.waitForDeletions(); + + Throwable ret = txnLogs.commit(accumulate); + + sstableNew.selfRef().release(); + return ret; + } + + protected Throwable doAbort(Throwable accumulate) + { + tidier.abort(); + TransactionLogs.waitForDeletions(); + + Throwable ret = txnLogs.abort(accumulate); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + return ret; + } + + protected void doPrepare() + { + txnLogs.prepareToCommit(); + } + + protected void assertInProgress() throws Exception + { + assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(), + sstableOld.getAllFilePaths()))); + assertFiles(txnLogs.getLogsFolder(), Sets.newHashSet(txnLogs.getData().oldLog().file.getPath(), + txnLogs.getData().newLog().file.getPath())); + assertEquals(2, TransactionLogs.getLogFiles(cfs.metadata).size()); + } + + protected void assertPrepared() throws Exception + { + } + + protected void assertAborted() throws Exception + { + assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); + assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + } + + protected void assertCommitted() throws Exception + { + assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + } + } + + final Transaction txn; + + private TxnTest() throws IOException + { + this(MockSchema.newCFS(KEYSPACE)); + } + + private TxnTest(ColumnFamilyStore cfs) throws IOException + { + this(cfs, new TransactionLogs(OperationType.COMPACTION, cfs.metadata)); + } + + private TxnTest(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException + { + this(new Transaction(cfs, txnLogs)); + } + + private TxnTest(Transaction txn) + { + super(txn); + this.txn = txn; + } + + protected void assertInProgress() throws Exception + { + txn.assertInProgress(); + } + + protected void assertPrepared() throws Exception + { + txn.assertPrepared(); + } + + protected void assertAborted() throws Exception + { + txn.assertAborted(); + } + + protected void assertCommitted() throws Exception + { + txn.assertCommitted(); + } + } + + @Test + public void testUntrack() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + // complete a transaction without keep the new files since they were untracked + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + transactionLogs.trackNew(sstableNew); + transactionLogs.untrackNew(sstableNew); + + transactionLogs.finish(); + + assertFiles(transactionLogs.getDataFolder(), Collections.<String>emptySet()); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + + sstableNew.selfRef().release(); + } + + @Test + public void testCommitSameDesc() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld1 = sstable(cfs, 0, 128); + SSTableReader sstableOld2 = sstable(cfs, 0, 256); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + transactionLogs.trackNew(sstableNew); + + sstableOld1.setReplaced(); + + TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld2); + assertNotNull(tidier); + + transactionLogs.finish(); + + sstableOld2.markObsolete(tidier); + + sstableOld1.selfRef().release(); + sstableOld2.selfRef().release(); + + TransactionLogs.waitForDeletions(); + + assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + + sstableNew.selfRef().release(); + } + + @Test + public void testCommitOnlyNew() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + transactionLogs.trackNew(sstable); + transactionLogs.finish(); + + assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + + sstable.selfRef().release(); + } + + @Test + public void testCommitOnlyOld() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable); + assertNotNull(tidier); + + transactionLogs.finish(); + sstable.markObsolete(tidier); + sstable.selfRef().release(); + + TransactionLogs.waitForDeletions(); + + assertFiles(transactionLogs.getDataFolder(), new HashSet<>()); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + } + + @Test + public void testAbortOnlyNew() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + transactionLogs.trackNew(sstable); + transactionLogs.abort(); + + sstable.selfRef().release(); + + assertFiles(transactionLogs.getDataFolder(), new HashSet<>()); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + } + + @Test + public void testAbortOnlyOld() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable = sstable(cfs, 0, 128); + + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable); + assertNotNull(tidier); + + tidier.abort(); + transactionLogs.abort(); + + sstable.selfRef().release(); + + assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths())); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size()); + } + + private File copyToTmpFile(File file) throws IOException + { + File ret = File.createTempFile(file.getName(), ".tmp"); + ret.deleteOnExit(); + Files.copy(file.toPath(), ret.toPath(), StandardCopyOption.REPLACE_EXISTING); + return ret; + } + + @Test + public void testRemoveUnfinishedLeftovers_newLogFound() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld = sstable(cfs, 0, 128); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + // simulate tracking sstables with a failed transaction (new log file NOT deleted) + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + transactionLogs.trackNew(sstableNew); + TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld); + + File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file); + File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file); + + Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata)); + for (String p : sstableNew.getAllFilePaths()) + tmpFiles.add(new File(p)); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + + Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory)); + + // normally called at startup + TransactionLogs.removeUnfinishedLeftovers(cfs.metadata); + + // sstable should not have been removed because the new log was found + Directories directories = new Directories(cfs.metadata); + Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list(); + assertEquals(1, sstables.size()); + + assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths())); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + + tidier.run(); + + // copy old transaction files contents back or transactionlogs will throw assertions + Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath()); + Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath()); + + transactionLogs.close(); + } + + @Test + public void testRemoveUnfinishedLeftovers_oldLogFound() throws Throwable + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstableOld = sstable(cfs, 0, 128); + SSTableReader sstableNew = sstable(cfs, 1, 128); + + // simulate tracking sstables with a committed transaction (new log file deleted) + TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata); + assertNotNull(transactionLogs); + + transactionLogs.trackNew(sstableNew); + TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld); + + File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file); + File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file); + + transactionLogs.getData().newLog().delete(false); + + Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata)); + for (String p : sstableOld.getAllFilePaths()) + tmpFiles.add(new File(p)); + + sstableNew.selfRef().release(); + sstableOld.selfRef().release(); + + Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory)); + + // normally called at startup + TransactionLogs.removeUnfinishedLeftovers(cfs.metadata); + + // sstable should have been removed because there was no new log. + Directories directories = new Directories(cfs.metadata); + Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list(); + assertEquals(1, sstables.size()); + + assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths())); + assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet()); + + tidier.run(); + + // copy old transaction files contents back or transactionlogs will throw assertions + Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath()); + Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath()); + + transactionLogs.close(); + } + + @Test + public void testGetTemporaryFiles() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE); + SSTableReader sstable1 = sstable(cfs, 0, 128); + + File dataFolder = sstable1.descriptor.directory; + + Set<File> tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + + TransactionLogs transactionLogs = new TransactionLogs(OperationType.WRITE, cfs.metadata); + Directories directories = new Directories(cfs.metadata); + + File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); + + SSTableReader sstable2 = sstable(cfs, 1, 128); + transactionLogs.trackNew(sstable2); + + Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list(); + assertEquals(2, sstables.size()); + + File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory()); + int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length; + assertTrue(numNewFiles == sstable2.getAllFilePaths().size()); + + tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder); + assertNotNull(tmpFiles); + assertEquals(numNewFiles + 2, tmpFiles.size()); //the extra files are the transaction log files + + File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA)); + File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX)); + + assertTrue(tmpFiles.contains(ssTable2DataFile)); + assertTrue(tmpFiles.contains(ssTable2IndexFile)); + + List<File> files = directories.sstableLister().listFiles(); + List<File> filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles(); + assertNotNull(files); + assertNotNull(filesNoTmp); + + assertTrue(files.contains(ssTable2DataFile)); + assertTrue(files.contains(ssTable2IndexFile)); + + assertFalse(filesNoTmp.contains(ssTable2DataFile)); + assertFalse(filesNoTmp.contains(ssTable2IndexFile)); + + transactionLogs.finish(); + + //Now it should be empty since the transaction has finished + tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder); + assertNotNull(tmpFiles); + assertEquals(0, tmpFiles.size()); + + filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles(); + assertNotNull(filesNoTmp); + assertTrue(filesNoTmp.contains(ssTable2DataFile)); + assertTrue(filesNoTmp.contains(ssTable2IndexFile)); + + sstable1.selfRef().release(); + sstable2.selfRef().release(); + } + + public static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException + { + Directories dir = new Directories(cfs.metadata); + Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation); + Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); + for (Component component : components) + { + File file = new File(descriptor.filenameFor(component)); + file.createNewFile(); + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + { + raf.setLength(size); + } + } + + SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), 0); + SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), 0); + + SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST); + StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) + .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header) + .get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, + components, + cfs.metadata, + Murmur3Partitioner.instance, + dFile, + iFile, + MockSchema.indexSummary.sharedCopy(), + new AlwaysPresentFilter(), + 1L, + metadata, + SSTableReader.OpenReason.NORMAL, + header); + reader.first = reader.last = MockSchema.readerBounds(generation); + return reader; + } + + private static void assertFiles(String dirPath, Set<String> expectedFiles) + { + File dir = new File(dirPath); + for (File file : dir.listFiles()) + { + if (file.isDirectory()) + continue; + + String filePath = file.getPath(); + assertTrue(filePath, expectedFiles.contains(filePath)); + expectedFiles.remove(filePath); + } + + assertTrue(expectedFiles.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java index 29875d5..08de62f 100644 --- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java @@ -61,19 +61,19 @@ public class BigTableWriterTest extends AbstractTransactionalTest { final File file; final Descriptor descriptor; - final SSTableWriter writer; + final SSTableTxnWriter writer; private TestableBTW() throws IOException { - this(cfs.getTempSSTablePath(cfs.directories.getDirectoryForNewSSTables())); + this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables())); } private TestableBTW(String file) throws IOException { - this(file, SSTableWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))); + this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))); } - private TestableBTW(String file, SSTableWriter sw) throws IOException + private TestableBTW(String file, SSTableTxnWriter sw) throws IOException { super(sw); this.file = new File(file); @@ -91,22 +91,19 @@ public class BigTableWriterTest extends AbstractTransactionalTest protected void assertInProgress() throws Exception { - assertExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX); - assertNotExists(Descriptor.Type.TEMP, Component.FILTER, Component.SUMMARY); - assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY); + assertExists(Component.DATA, Component.PRIMARY_INDEX); + assertNotExists(Component.FILTER, Component.SUMMARY); Assert.assertTrue(file.length() > 0); } protected void assertPrepared() throws Exception { - assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY); - assertExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY); + assertExists(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY); } protected void assertAborted() throws Exception { - assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY); - assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY); + assertNotExists(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY); Assert.assertFalse(file.exists()); } @@ -115,15 +112,15 @@ public class BigTableWriterTest extends AbstractTransactionalTest assertPrepared(); } - private void assertExists(Descriptor.Type type, Component ... components) + private void assertExists(Component ... components) { for (Component component : components) - Assert.assertTrue(new File(descriptor.asType(type).filenameFor(component)).exists()); + Assert.assertTrue(new File(descriptor.filenameFor(component)).exists()); } - private void assertNotExists(Descriptor.Type type, Component ... components) + private void assertNotExists(Component ... components) { for (Component component : components) - Assert.assertFalse(type.toString() + " " + component.toString(), new File(descriptor.asType(type).filenameFor(component)).exists()); + Assert.assertFalse(component.toString(), new File(descriptor.filenameFor(component)).exists()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java index dab88c8..9a558f1 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java @@ -28,10 +28,13 @@ import org.junit.Before; import org.junit.Test; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.util.FileUtils; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class CQLSSTableWriterClientTest { @@ -97,5 +100,11 @@ public class CQLSSTableWriterClientTest File[] dataFiles = this.testDirectory.listFiles(filter); assertEquals(2, dataFiles.length); + File transactionsFolder = Directories.getTransactionsDirectory(testDirectory); + assertTrue(transactionsFolder.exists()); + + File[] opFiles = transactionsFolder.listFiles(); + assertEquals(0, opFiles.length); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java index ceb0f9c..19eca40 100644 --- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java @@ -77,24 +77,19 @@ public class DescriptorTest private void testFromFilenameFor(File dir) { // normal - checkFromFilename(new Descriptor(dir, ksname, cfname, 1, Descriptor.Type.FINAL), false); + checkFromFilename(new Descriptor(dir, ksname, cfname, 1), false); // skip component (for streaming lock file) - checkFromFilename(new Descriptor(dir, ksname, cfname, 2, Descriptor.Type.FINAL), true); - // tmp - checkFromFilename(new Descriptor(dir, ksname, cfname, 3, Descriptor.Type.TEMP), false); + checkFromFilename(new Descriptor(dir, ksname, cfname, 2), true); + // secondary index String idxName = "myidx"; File idxDir = new File(dir.getAbsolutePath() + File.separator + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName); - checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4, Descriptor.Type.FINAL), false); - // secondary index tmp - checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 5, Descriptor.Type.TEMP), false); + checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4), false); // legacy version - checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY), false); - // legacy tmp - checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 2, Descriptor.Type.TEMP, SSTableFormat.Type.LEGACY), false); + checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, SSTableFormat.Type.LEGACY), false); // legacy secondary index - checkFromFilename(new Descriptor("ja", dir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 3, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY), false); + checkFromFilename(new Descriptor("ja", dir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 3, SSTableFormat.Type.LEGACY), false); } private void checkFromFilename(Descriptor original, boolean skipComponent) @@ -109,7 +104,6 @@ public class DescriptorTest assertEquals(original.cfname, desc.cfname); assertEquals(original.version, desc.version); assertEquals(original.generation, desc.generation); - assertEquals(original.type, desc.type); if (skipComponent) {
