Delete unfinished compaction sstables incrementally patch by thobbs; reviewed by yukim for CASSANDRA-6086
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed22340 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed22340 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed22340 Branch: refs/heads/trunk Commit: 4ed2234078c4d302c256332252a8ddd6ae345484 Parents: c5ca8de Author: Yuki Morishita <[email protected]> Authored: Mon Dec 30 13:48:09 2013 -0600 Committer: Yuki Morishita <[email protected]> Committed: Mon Dec 30 13:48:09 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 39 +++++++---- .../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++-- .../db/compaction/LeveledManifest.java | 14 ++++ .../cassandra/service/CassandraDaemon.java | 43 +++++------- .../cassandra/db/ColumnFamilyStoreTest.java | 74 ++++++++++++++++++-- .../db/compaction/CompactionsTest.java | 4 +- 7 files changed, 151 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0396006..958369a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.5 +* Delete unfinished compaction incrementally (CASSANDRA-6086) Merged from 1.2: * fsync compression metadata (CASSANDRA-6531) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index cbd9d2e..4d7d6f2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -483,27 +483,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple * sstables from any given ancestor). */ - public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations) + public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Map<Integer, UUID> unfinishedCompactions) { Directories directories = Directories.create(keyspace, columnfamily); - // sanity-check unfinishedGenerations - Set<Integer> allGenerations = new HashSet<Integer>(); + Set<Integer> allGenerations = new HashSet<>(); for (Descriptor desc : directories.sstableLister().list().keySet()) allGenerations.add(desc.generation); + + // sanity-check unfinishedCompactions + Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet(); if (!allGenerations.containsAll(unfinishedGenerations)) { - throw new IllegalStateException("Unfinished compactions reference missing sstables." - + " This should never happen since compactions are marked finished before we start removing the old sstables."); + HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations); + missingGenerations.removeAll(allGenerations); + logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}", + keyspace, columnfamily, missingGenerations); } // remove new sstables from compactions that didn't complete, and compute // set of ancestors that shouldn't exist anymore - Set<Integer> completedAncestors = new HashSet<Integer>(); + Set<Integer> completedAncestors = new HashSet<>(); for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) { Descriptor desc = sstableFiles.getKey(); - Set<Component> components = sstableFiles.getValue(); Set<Integer> ancestors; try @@ -515,9 +518,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean throw new FSReadError(e, desc.filenameFor(Component.STATS)); } - if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors)) + if (!ancestors.isEmpty() + && unfinishedGenerations.containsAll(ancestors) + && allGenerations.containsAll(ancestors)) { - SSTable.delete(desc, components); + // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one + UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next()); + assert compactionTaskID != null; + logger.debug("Going to delete unfinished compaction product {}", desc); + SSTable.delete(desc, sstableFiles.getValue()); + SystemKeyspace.finishCompaction(compactionTaskID); } else { @@ -529,10 +539,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) { Descriptor desc = sstableFiles.getKey(); - Set<Component> components = sstableFiles.getValue(); - if (completedAncestors.contains(desc.generation)) - SSTable.delete(desc, components); + { + // if any of the ancestors were participating in a compaction, finish that compaction + logger.debug("Going to delete leftover compaction ancestor {}", desc); + SSTable.delete(desc, sstableFiles.getValue()); + UUID compactionTaskID = unfinishedCompactions.get(desc.generation); + if (compactionTaskID != null) + SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation)); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 9093ec0..910f025 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -188,6 +188,11 @@ public class SystemKeyspace return compactionId; } + /** + * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need + * to complete successfully for this to be called. + * @param taskId what was returned from {@code startCompaction} + */ public static void finishCompaction(UUID taskId) { assert taskId != null; @@ -198,21 +203,31 @@ public class SystemKeyspace } /** - * @return unfinished compactions, grouped by keyspace/columnfamily pair. + * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the + * task ID of the compaction they were participating in. */ - public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions() + public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions() { String req = "SELECT * FROM system.%s"; UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG)); - SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create(); + Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>(); for (UntypedResultSet.Row row : resultSet) { String keyspace = row.getString("keyspace_name"); String columnfamily = row.getString("columnfamily_name"); Set<Integer> inputs = row.getSet("inputs", Int32Type.instance); + UUID taskID = row.getUUID("id"); + + Pair<String, String> kscf = Pair.create(keyspace, columnfamily); + Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf); + if (generationToTaskID == null) + generationToTaskID = new HashMap<>(inputs.size()); + + for (Integer generation : inputs) + generationToTaskID.put(generation, taskID); - unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs); + unfinishedCompactions.put(kscf, generationToTaskID); } return unfinishedCompactions; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 92cd887..4347ad5 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -33,6 +33,8 @@ import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.Bounds; @@ -619,4 +621,16 @@ public class LeveledManifest this.maxSSTableBytes = maxSSTableBytes; } } + + public static void maybeMigrateManifests() throws IOException + { + for (String keyspaceName : Schema.instance.getKeyspaces()) + { + for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values()) + { + if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName)) + LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 0cebc11..d36b0db 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -24,6 +24,8 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.util.Arrays; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -32,7 +34,6 @@ import javax.management.StandardMBean; import com.addthis.metrics.reporter.config.ReporterConfig; import com.google.common.collect.Iterables; -import com.google.common.collect.SetMultimap; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ import org.apache.cassandra.db.MeteredFlusher; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.LegacyLeveledManifest; +import org.apache.cassandra.db.compaction.LeveledManifest; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.util.FileUtils; @@ -236,35 +237,29 @@ public class CassandraDaemon // load keyspace descriptions. DatabaseDescriptor.loadSchemas(); - // clean up debris in the rest of the keyspaces - for (String keyspaceName : Schema.instance.getKeyspaces()) + try { - for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values()) - { - if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName)) - { - try - { - LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName); - } - catch (IOException e) - { - logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e); - System.exit(100); - } - } - - ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName); - } + LeveledManifest.maybeMigrateManifests(); } + catch(IOException e) + { + logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e); + System.exit(100); + } + // clean up compaction leftovers - SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions(); + Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions(); for (Pair<String, String> kscf : unfinishedCompactions.keySet()) - { ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf)); - } SystemKeyspace.discardCompactionsInProgress(); + // clean up debris in the rest of the keyspaces + for (String keyspaceName : Schema.instance.getKeyspaces()) + { + for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values()) + ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName); + } + // initialize keyspaces for (String keyspaceName : Schema.instance.getKeyspaces()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 4e6c87f..65b1708 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; import static org.junit.Assert.*; @@ -1581,7 +1582,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader } @Test - public void testRemoveUnifinishedCompactionLeftovers() throws Throwable + public void testRemoveUnfinishedCompactionLeftovers() throws Throwable { String ks = "Keyspace1"; String cf = "Standard3"; // should be empty @@ -1597,14 +1598,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader writer.close(); Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list(); - assert sstables.size() == 1; + assertEquals(1, sstables.size()); Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next(); final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey()); // simulate incomplete compaction writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), - cfmeta, StorageService.getPartitioner()) + cfmeta, StorageService.getPartitioner()) { protected SSTableWriter getWriter() { @@ -1623,12 +1624,75 @@ public class ColumnFamilyStoreTest extends SchemaLoader // should have 2 sstables now sstables = dir.sstableLister().list(); - assert sstables.size() == 2; + assertEquals(2, sstables.size()); + + UUID compactionTaskID = SystemKeyspace.startCompaction( + Keyspace.open(ks).getColumnFamilyStore(cf), + Collections.singleton(SSTableReader.open(sstable1.descriptor))); - ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation)); + Map<Integer, UUID> unfinishedCompaction = new HashMap<>(); + unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID); + ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompaction); // 2nd sstable should be removed (only 1st sstable exists in set of size 1) sstables = dir.sstableLister().list(); + assertEquals(1, sstables.size()); + assertTrue(sstables.containsKey(sstable1.descriptor)); + + Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions(); + assertTrue(unfinished.isEmpty()); + } + + /** + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a> + */ + @Test + public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable + { + final String ks = "Keyspace1"; + final String cf = "Standard4"; // should be empty + + final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); + Directories dir = Directories.create(ks, cf); + ByteBuffer key = bytes("key"); + + // Write SSTable generation 3 that has ancestors 1 and 2 + final Set<Integer> ancestors = Sets.newHashSet(1, 2); + SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), + cfmeta, StorageService.getPartitioner()) + { + protected SSTableWriter getWriter() + { + SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator); + for (int ancestor : ancestors) + collector.addAncestor(ancestor); + String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA); + return new SSTableWriter(file, + 0, + metadata, + StorageService.getPartitioner(), + collector); + } + }; + writer.newRow(key); + writer.addColumn(bytes("col"), bytes("val"), 1); + writer.close(); + + Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list(); + assert sstables.size() == 1; + + Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next(); + final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey()); + + // simulate we don't have generation in compaction_history + Map<Integer, UUID> unfinishedCompactions = new HashMap<>(); + UUID compactionTaskID = UUID.randomUUID(); + for (Integer ancestor : ancestors) + unfinishedCompactions.put(ancestor, compactionTaskID); + ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions); + + // SSTable should not be deleted + sstables = dir.sstableLister().list(); assert sstables.size() == 1; assert sstables.containsKey(sstable1.descriptor); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index a338290..7b91bed 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -289,8 +289,8 @@ public class CompactionsTest extends SchemaLoader } })); UUID taskId = SystemKeyspace.startCompaction(cfs, sstables); - SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemKeyspace.getUnfinishedCompactions(); - Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)); + Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions(); + Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet(); assert unfinishedCompactions.containsAll(generations); SystemKeyspace.finishCompaction(taskId);
