Support multiple folders for user defined compaction tasks patch by Stefania Alborghetti; reviewed by Marcus Eriksson for CASSANDRA-11765
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5a7ff5e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5a7ff5e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5a7ff5e3 Branch: refs/heads/trunk Commit: 5a7ff5e32312426b63d9a8a5dc7fd58fa2ffb8ce Parents: ff42012 Author: Stefania Alborghetti <[email protected]> Authored: Mon May 30 16:25:19 2016 +0200 Committer: Stefania Alborghetti <[email protected]> Committed: Fri Jun 3 16:29:48 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 9 ++-- .../compaction/CompactionStrategyManager.java | 43 ++++++++++++++++++-- test/unit/org/apache/cassandra/Util.java | 5 ++- .../db/compaction/CompactionsPurgeTest.java | 9 +++- 5 files changed, 57 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 02914cf..9e3857b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.7 + * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.0: * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index b197680..dca48aa 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -800,9 +800,12 @@ public class CompactionManager implements CompactionManagerMBean } else { - AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore); - if (task != null) - task.execute(metrics); + List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore); + for (AbstractCompactionTask task : tasks) + { + if (task != null) + task.execute(metrics); + } } } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index fbb25a3..51a874b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -731,14 +731,39 @@ public class CompactionStrategyManager implements INotificationConsumer }, false, false); } - public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) + /** + * Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according + * to whether they are repaired or not, and by disk location. Return a task per disk location and repair status + * group. + * + * @param sstables the sstables to compact + * @param gcBefore gc grace period, throw away tombstones older than this + * @return a list of compaction tasks corresponding to the sstables requested + */ + public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore) { maybeReload(cfs.metadata); - validateForCompaction(sstables, cfs, getDirectories()); + List<AbstractCompactionTask> ret = new ArrayList<>(); + readLock.lock(); try { - return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore); + Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream() + .filter(s -> !s.isMarkedSuspect() && s.isRepaired()) + .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + + Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream() + .filter(s -> !s.isMarkedSuspect() && !s.isRepaired()) + .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s))); + + + for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet()) + ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore)); + + for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet()) + ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore)); + + return ret; } finally { @@ -746,6 +771,18 @@ public class CompactionStrategyManager implements INotificationConsumer } } + /** + * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead. + */ + @Deprecated() + public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) + { + validateForCompaction(sstables, cfs, getDirectories()); + List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore); + assert tasks.size() == 1; + return tasks.get(0); + } + public int getEstimatedRemainingTasks() { int tasks = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 87a07b0..6349cde 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -228,8 +228,9 @@ public class Util public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds()); - AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore); - task.execute(null); + List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore); + for (AbstractCompactionTask task : tasks) + task.execute(null); } public static void expectEOF(Callable<?> callable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index 26d53ed..ef26b35 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.util.Collection; +import java.util.List; import java.util.concurrent.ExecutionException; import org.junit.BeforeClass; @@ -167,7 +168,9 @@ public class CompactionsPurgeTest .build().applyUnsafe(); cfs.forceBlockingFlush(); - cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); + List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE); + assertEquals(1, tasks.size()); + tasks.get(0).execute(null); // verify that minor compaction does GC when key is provably not // present in a non-compacted sstable @@ -215,7 +218,9 @@ public class CompactionsPurgeTest cfs.forceBlockingFlush(); // compact the sstables with the c1/c2 data and the c1 tombstone - cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); + List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE); + assertEquals(1, tasks.size()); + tasks.get(0).execute(null); // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.
