Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 2e791af0d -> 79a9d4374
Make sure we cancel non-compacting sstables from LifecycleTransaction Patch by marcuse; reviewed by benedict for CASSANDRA-9566 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/79a9d437 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/79a9d437 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/79a9d437 Branch: refs/heads/cassandra-2.2 Commit: 79a9d437442c37b1cf49700ce1d618d25cab8a52 Parents: 2e791af Author: Marcus Eriksson <[email protected]> Authored: Tue Jun 9 09:39:31 2015 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Wed Jun 10 10:13:22 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 2 - .../db/compaction/CompactionManager.java | 40 ++++++++++---------- 3 files changed, 20 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/79a9d437/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 04bbd84..b94279b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Make sure we cancel non-compacting sstables from LifecycleTransaction (CASSANDRA-9566) * Compressed commit log should measure compressed space used (CASSANDRA-9095) * Fix comparison bug in CassandraRoleManager#collectRoles (CASSANDRA-9551) * Add tinyint,smallint,time,date support for UDFs (CASSANDRA-9400) http://git-wip-us.apache.org/repos/asf/cassandra/blob/79a9d437/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 865bac9..ce54fb2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2595,8 +2595,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { assert data.getCompacting().isEmpty() : data.getCompacting(); Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables())); - if (Iterables.isEmpty(sstables)) - return null; LifecycleTransaction modifier = data.tryModify(sstables, operationType); assert modifier != null: "something marked things compacting while compactions are disabled"; return modifier; http://git-wip-us.apache.org/repos/asf/cassandra/blob/79a9d437/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a2783da..46a59db 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -251,21 +251,15 @@ public class CompactionManager implements CompactionManagerMBean { try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);) { - if (compacting == null) - { - logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.ABORTED; - } - if (compacting.originals().isEmpty()) + Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting)); + if (Iterables.isEmpty(sstables)) { logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name); return AllSSTableOpStatus.SUCCESSFUL; } - Iterable<SSTableReader> sstables = operation.filterSSTables(compacting.originals()); List<Pair<LifecycleTransaction,Future<Object>>> futures = new ArrayList<>(); - for (final SSTableReader sstable : sstables) { if (executor.isShutdown()) @@ -320,7 +314,7 @@ public class CompactionManager implements CompactionManagerMBean private static interface OneSSTableOperation { - Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input); + Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction); void execute(LifecycleTransaction input) throws IOException; } @@ -338,9 +332,9 @@ public class CompactionManager implements CompactionManagerMBean return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @Override - public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input) { - return input; + return input.originals(); } @Override @@ -357,9 +351,9 @@ public class CompactionManager implements CompactionManagerMBean return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @Override - public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input) { - return input; + return input.originals(); } @Override @@ -375,16 +369,20 @@ public class CompactionManager implements CompactionManagerMBean return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @Override - public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) { - return Iterables.filter(input, new Predicate<SSTableReader>() + Iterable<SSTableReader> sstables = new ArrayList<>(transaction.originals()); + Iterator<SSTableReader> iter = sstables.iterator(); + while (iter.hasNext()) { - @Override - public boolean apply(SSTableReader sstable) + SSTableReader sstable = iter.next(); + if (excludeCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion())) { - return !(excludeCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion())); + transaction.cancel(sstable); + iter.remove(); } - }); + } + return sstables; } @Override @@ -413,9 +411,9 @@ public class CompactionManager implements CompactionManagerMBean return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() { @Override - public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) { - List<SSTableReader> sortedSSTables = Lists.newArrayList(input); + List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals()); Collections.sort(sortedSSTables, new SSTableReader.SizeComparator()); return sortedSSTables; }
