Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 a9b542205 -> 8b8a3f5b9 refs/heads/cassandra-2.2 6c1ef2ba4 -> 897ffe87e refs/heads/cassandra-3.0 3efc609e0 -> 0c91977da refs/heads/cassandra-3.5 c7ef7c91c -> 0a2508544 refs/heads/trunk a0a949440 -> f04224707
Add a -j parameter to scrub/cleanup/upgradesstables to state how many threads to use Patch by marcuse; reviewed by Carl Yeksigian for CASSANDRA-11179 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b8a3f5b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b8a3f5b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b8a3f5b Branch: refs/heads/cassandra-2.1 Commit: 8b8a3f5b99fa7a8eefed14cd7e41b81773617046 Parents: a9b5422 Author: Marcus Eriksson <[email protected]> Authored: Mon Mar 14 11:01:11 2016 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Tue Mar 29 10:50:38 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 12 +++---- .../db/compaction/CompactionManager.java | 35 +++++++++++++++----- .../cassandra/service/StorageService.java | 24 +++++++++++--- .../cassandra/service/StorageServiceMBean.java | 6 ++++ .../org/apache/cassandra/tools/NodeProbe.java | 33 +++++++++++------- .../org/apache/cassandra/tools/NodeTool.java | 21 ++++++++++-- .../org/apache/cassandra/db/CleanupTest.java | 6 ++-- .../unit/org/apache/cassandra/db/ScrubTest.java | 18 +++++----- .../LeveledCompactionStrategyTest.java | 2 +- 10 files changed, 111 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5f01114..7794d4f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,8 @@ * Gossiper#isEnabled is not thread safe (CASSANDRA-11116) * Avoid major compaction mixing repaired and unrepaired sstables in DTCS (CASSANDRA-11113) * test_bulk_round_trip_blogposts is failing occasionally (CASSANDRA-10938) + * Add a -j parameter to scrub/cleanup/upgradesstables to state how + many threads to use (CASSANDRA-11179) 2.1.13 http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index a78f33f..3d66d3a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1478,22 +1478,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return maxFile; } - public CompactionManager.AllSSTableOpStatus forceCleanup() throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus forceCleanup(int jobs) throws ExecutionException, InterruptedException { - return CompactionManager.instance.performCleanup(ColumnFamilyStore.this); + return CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs); } - public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs) throws ExecutionException, InterruptedException { // skip snapshot creation during scrub, SEE JIRA 5891 if(!disableSnapshot) snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis()); - return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData); + return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, jobs); } - public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException { - return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); + return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs); } public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index ec7cb45..e382cab 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -271,7 +271,17 @@ public class CompactionManager implements CompactionManagerMBean } } - private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException + /** + * Run an operation over all sstables using jobs threads + * + * @param cfs the column family store to run the operation on + * @param operation the operation to run + * @param jobs the number of threads to use - 0 means use all available. It never uses more than concurrent_compactors threads + * @return status of the operation + * @throws ExecutionException + * @throws InterruptedException + */ + private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs) throws ExecutionException, InterruptedException { Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting(); if (compactingSSTables == null) @@ -299,7 +309,8 @@ public class CompactionManager implements CompactionManagerMBean logger.info("Executor has shut down, not submitting task"); return AllSSTableOpStatus.ABORTED; } - futures.add(executor.submit(new Callable<Object>() + + Callable<Object> callable = new Callable<Object>() { @Override public Object call() throws Exception @@ -315,7 +326,13 @@ public class CompactionManager implements CompactionManagerMBean } return this; } - })); + }; + futures.add(executor.submit(callable)); + if (jobs > 0 && futures.size() == jobs) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); + } } FBUtilities.waitOnFutures(futures); } @@ -341,7 +358,7 @@ public class CompactionManager implements CompactionManagerMBean } } - public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs) throws InterruptedException, ExecutionException { assert !cfs.isIndex(); return parallelAllSSTableOperation(cfs, new OneSSTableOperation() @@ -357,10 +374,10 @@ public class CompactionManager implements CompactionManagerMBean { scrubOne(cfs, input, skipCorrupted, checkData); } - }); + }, jobs); } - public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException { return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @@ -385,10 +402,10 @@ public class CompactionManager implements CompactionManagerMBean task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(metrics); } - }); + }, jobs); } - public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jobs) throws InterruptedException, ExecutionException { assert !cfStore.isIndex(); Keyspace keyspace = cfStore.keyspace; @@ -416,7 +433,7 @@ public class CompactionManager implements CompactionManagerMBean CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges); doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes); } - }); + }, jobs); } public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 98e2251..507aedb 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2385,13 +2385,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + return forceKeyspaceCleanup(0, keyspaceName, columnFamilies); + } + + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { if (keyspaceName.equals(Keyspace.SYSTEM_KS)) throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise"); CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup(); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup(jobs); if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) status = oneStatus; } @@ -2400,27 +2405,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies); + return scrub(disableSnapshot, skipCorrupted, true, 0, keyspaceName, columnFamilies); } public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies); + } + + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs); if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) status = oneStatus; } return status.statusCode; } - public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + return upgradeSSTables(keyspaceName, excludeCurrentVersion, 2, columnFamilies); + } + + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, columnFamilies)) { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion, jobs); if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) status = oneStatus; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 8fa2433..d3a1725 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -260,7 +260,9 @@ public interface StorageServiceMBean extends NotificationEmitter /** * Trigger a cleanup of keys on a single keyspace */ + @Deprecated public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** * Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace. @@ -270,13 +272,17 @@ public interface StorageServiceMBean extends NotificationEmitter */ @Deprecated public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + @Deprecated public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** * Rewrite all sstables to the latest version. * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables first. */ + @Deprecated public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** * Flush all memtables for the given column families, or all columnfamilies for the given keyspace http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 1ad1147..ab08e9f 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -63,6 +63,7 @@ import javax.management.remote.JMXServiceURL; import javax.rmi.ssl.SslRMIClientSocketFactory; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.HintedHandOffManager; import org.apache.cassandra.db.HintedHandOffManagerMBean; @@ -237,42 +238,50 @@ public class NodeProbe implements AutoCloseable jmxc.close(); } - public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies); + return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies); } - public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies); + return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies); } - public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); + return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies); } - public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + private void checkJobs(PrintStream out, int jobs) { - if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0) + if (jobs > DatabaseDescriptor.getConcurrentCompactors()) + out.println(String.format("jobs (%d) is bigger than configured concurrent_compactors (%d), using at most %d threads", jobs, DatabaseDescriptor.getConcurrentCompactors(), DatabaseDescriptor.getConcurrentCompactors())); + } + + public void forceKeyspaceCleanup(PrintStream out, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + if (forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies) != 0) { failed = true; out.println("Aborted cleaning up atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); } } - public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies) != 0) + checkJobs(out, jobs); + if (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies) != 0) { failed = true; out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); } } - public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - if (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies) != 0) + checkJobs(out, jobs); + if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies) != 0) { failed = true; out.println("Aborted upgrading sstables for atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 819049e..1de2e20 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -1138,6 +1138,11 @@ public class NodeTool @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families") private List<String> args = new ArrayList<>(); + @Option(title = "jobs", + name = {"-j", "--jobs"}, + description = "Number of sstables to cleanup simultaneusly, set to 0 to use all available compaction threads") + private int jobs = 2; + @Override public void execute(NodeProbe probe) { @@ -1151,7 +1156,7 @@ public class NodeTool try { - probe.forceKeyspaceCleanup(System.out, keyspace, cfnames); + probe.forceKeyspaceCleanup(System.out, jobs, keyspace, cfnames); } catch (Exception e) { throw new RuntimeException("Error occurred during cleanup", e); @@ -1267,6 +1272,11 @@ public class NodeTool description = "Do not validate columns using column validator") private boolean noValidation = false; + @Option(title = "jobs", + name = {"-j", "--jobs"}, + description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads") + private int jobs = 2; + @Override public void execute(NodeProbe probe) { @@ -1277,7 +1287,7 @@ public class NodeTool { try { - probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, keyspace, cfnames); + probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, jobs, keyspace, cfnames); } catch (Exception e) { throw new RuntimeException("Error occurred during flushing", e); @@ -1345,6 +1355,11 @@ public class NodeTool @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version") private boolean includeAll = false; + @Option(title = "jobs", + name = {"-j","--jobs"}, + description = "Number of sstables to upgrade simultaneously, set to 0 to use all available compaction threads") + private int jobs = 2; + @Override public void execute(NodeProbe probe) { @@ -1355,7 +1370,7 @@ public class NodeTool { try { - probe.upgradeSSTables(System.out, keyspace, !includeAll, cfnames); + probe.upgradeSSTables(System.out, keyspace, !includeAll, jobs, cfnames); } catch (Exception e) { throw new RuntimeException("Error occurred during enabling auto-compaction", e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/CleanupTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java index 1d04dfa..7f54ed7 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTest.java @@ -83,7 +83,7 @@ public class CleanupTest extends SchemaLoader assertEquals(LOOPS, rows.size()); // with one token in the ring, owned by the local node, cleanup should be a no-op - CompactionManager.instance.performCleanup(cfs); + CompactionManager.instance.performCleanup(cfs, 2); // ensure max timestamp of the sstables are retained post-cleanup assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs)); @@ -128,7 +128,7 @@ public class CleanupTest extends SchemaLoader tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); - CompactionManager.instance.performCleanup(cfs); + CompactionManager.instance.performCleanup(cfs, 2); // row data should be gone rows = Util.getRangeSlice(cfs); @@ -165,7 +165,7 @@ public class CleanupTest extends SchemaLoader tk2[0] = 1; tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); - CompactionManager.instance.performCleanup(cfs); + CompactionManager.instance.performCleanup(cfs, 2); rows = Util.getRangeSlice(cfs); assertEquals(0, rows.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 167671b..4efd082 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -99,7 +99,7 @@ public class ScrubTest extends SchemaLoader rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); assertEquals(1, rows.size()); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); // check data is still there rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); @@ -228,7 +228,7 @@ public class ScrubTest extends SchemaLoader SSTableReader sstable = cfs.getSSTables().iterator().next(); overrideWithGarbage(sstable, 0, 2); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); // check data is still there rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); @@ -249,7 +249,7 @@ public class ScrubTest extends SchemaLoader rm.applyUnsafe(); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); assert cfs.getSSTables().isEmpty(); } @@ -268,7 +268,7 @@ public class ScrubTest extends SchemaLoader rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); assertEquals(10, rows.size()); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); // check data is still there rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); @@ -293,7 +293,7 @@ public class ScrubTest extends SchemaLoader for (SSTableReader sstable : cfs.getSSTables()) new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)).delete(); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); // check data is still there rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); @@ -506,7 +506,7 @@ public class ScrubTest extends SchemaLoader QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')"); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_scrub_validation (a text primary key, b int)", ConsistencyLevel.ONE); ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("test_scrub_validation"); @@ -516,7 +516,7 @@ public class ScrubTest extends SchemaLoader mutation.apply(); cfs2.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs2, false, false); + CompactionManager.instance.performScrub(cfs2, false, false, 2); } /** @@ -533,7 +533,7 @@ public class ScrubTest extends SchemaLoader Mutation mutation = new Mutation("Keyspace1", ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf); mutation.applyUnsafe(); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); assertEquals(1, cfs.getSSTables().size()); } @@ -554,7 +554,7 @@ public class ScrubTest extends SchemaLoader QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')"); QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')"); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, true, true); + CompactionManager.instance.performScrub(cfs, true, true, 2); // Scrub is silent, but it will remove broken records. So reading everything back to make sure nothing to "scrubbed away" UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 7d33c11..749056c 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -328,7 +328,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader assertTrue(strategy.getAllLevelSize()[1] > 0); cfs.disableAutoCompaction(); - cfs.sstablesRewrite(false); + cfs.sstablesRewrite(false, 2); assertTrue(strategy.getAllLevelSize()[1] > 0); }
