Repository: cassandra Updated Branches: refs/heads/trunk 3120b8b65 -> cae395026
Add forceUserDefinedCleanup to allow more flexible cleanup patch by jeffj; reviewed by yukim for CASSANDRA-10708 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cae39502 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cae39502 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cae39502 Branch: refs/heads/trunk Commit: cae395026af3f82afbbee6d2ab090f985ee006d3 Parents: 3120b8b Author: Jeff Jirsa <[email protected]> Authored: Tue Dec 22 15:11:25 2015 -0600 Committer: Yuki Morishita <[email protected]> Committed: Tue Dec 22 15:11:25 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 56 ++++++++++++++++++++ .../db/compaction/CompactionManagerMBean.java | 11 ++++ .../org/apache/cassandra/db/CleanupTest.java | 26 +++++++++ 4 files changed, 94 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3b7f389..9f82731 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.2 + * Add forceUserDefinedCleanup to allow more flexible cleanup (CASSANDRA-10708) * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494) * Fix EstimatedHistogram creation in nodetool tablehistograms (CASSANDRA-10859) * Establish bootstrap stream sessions sequentially (CASSANDRA-6992) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index fafab69..cfffa14 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -609,6 +609,62 @@ public class CompactionManager implements CompactionManagerMBean FBUtilities.waitOnFutures(futures); } + public void forceUserDefinedCleanup(String dataFiles) + { + String[] filenames = dataFiles.split(","); + HashMap<ColumnFamilyStore, Descriptor> descriptors = Maps.newHashMap(); + + for (String filename : filenames) + { + // extract keyspace and columnfamily name from filename + Descriptor desc = Descriptor.fromFilename(filename.trim()); + if (Schema.instance.getCFMetaData(desc) == null) + { + logger.warn("Schema does not exist for file {}. Skipping.", filename); + continue; + } + // group by keyspace/columnfamily + ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname); + desc = cfs.getDirectories().find(new File(filename.trim()).getName()); + if (desc != null) + descriptors.put(cfs, desc); + } + + for (Map.Entry<ColumnFamilyStore,Descriptor> entry : descriptors.entrySet()) + { + ColumnFamilyStore cfs = entry.getKey(); + Keyspace keyspace = cfs.keyspace; + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); + boolean hasIndexes = cfs.indexManager.hasIndexes(); + SSTableReader sstable = lookupSSTable(cfs, entry.getValue()); + + if (ranges.isEmpty()) + { + logger.error("Cleanup cannot run before a node has joined the ring"); + return; + } + + if(sstable == null) + { + logger.warn("Will not clean {}, it is not an active sstable", entry.getValue()); + } + else + { + LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.CLEANUP); + CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, FBUtilities.nowInSeconds()); + try + { + doCleanupOne(cfs, txn, cleanupStrategy, ranges, hasIndexes); + } + catch (IOException e) + { + logger.error(String.format("forceUserDefinedCleanup failed: %s", e.getLocalizedMessage())); + } + } + } + } + + public Future<?> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore) { Runnable runnable = new WrappedRunnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java index d5da0fe..bb67d5f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java @@ -45,6 +45,17 @@ public interface CompactionManagerMBean public void forceUserDefinedCompaction(String dataFiles); /** + * Triggers the cleanup of user specified sstables. + * You can specify files from various keyspaces and columnfamilies. + * If you do so, cleanup is performed each file individually + * + * @param dataFiles a comma separated list of sstable file to cleanup. + * must contain keyspace and columnfamily name in path(for 2.1+) or file name itself. + */ + public void forceUserDefinedCleanup(String dataFiles); + + + /** * Stop all running compaction-like tasks having the provided {@code type}. * @param type the type of compaction to stop. Can be one of: * - COMPACTION http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/test/unit/org/apache/cassandra/db/CleanupTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java index 167f3b0..c15bdb4 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTest.java @@ -174,6 +174,32 @@ public class CleanupTest } @Test + public void testuserDefinedCleanupWithNewToken() throws ExecutionException, InterruptedException, UnknownHostException + { + StorageService.instance.getTokenMetadata().clearUnsafe(); + + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + + // insert data and verify we get it back w/ range query + fillCF(cfs, "val", LOOPS); + + assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size()); + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + byte[] tk1 = new byte[1], tk2 = new byte[1]; + tk1[0] = 2; + tk2[0] = 1; + tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); + tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); + + for(SSTableReader r: cfs.getLiveSSTables()) + CompactionManager.instance.forceUserDefinedCleanup(r.getFilename()); + + assertEquals(0, Util.getAll(Util.cmd(cfs).build()).size()); + } + + @Test public void testNeedsCleanup() throws Exception { // setup
