Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/897ffe87 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/897ffe87 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/897ffe87 Branch: refs/heads/cassandra-3.5 Commit: 897ffe87e41ab128c9e8969d535cb2a706baf563 Parents: 6c1ef2b 8b8a3f5 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Mar 29 10:54:45 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Mar 29 10:54:45 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/ColumnFamilyStore.java | 16 ++++---- .../db/compaction/CompactionManager.java | 40 +++++++++++++------- .../cassandra/service/StorageService.java | 24 +++++++++--- .../cassandra/service/StorageServiceMBean.java | 6 +++ .../org/apache/cassandra/tools/NodeProbe.java | 34 +++++++++++------ .../cassandra/tools/nodetool/Cleanup.java | 8 +++- .../apache/cassandra/tools/nodetool/Scrub.java | 7 +++- .../tools/nodetool/UpgradeSSTable.java | 7 +++- .../org/apache/cassandra/db/CleanupTest.java | 6 +-- .../unit/org/apache/cassandra/db/ScrubTest.java | 20 +++++----- .../LeveledCompactionStrategyTest.java | 2 +- 12 files changed, 117 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 91dc588,7794d4f..098d062 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,39 -1,4 +1,41 @@@ -2.1.14 +2.2.6 + * cqlsh: COPY FROM should check that explicit column names are valid (CASSANDRA-11333) + * Add -Dcassandra.start_gossip startup option (CASSANDRA-10809) + * Fix UTF8Validator.validate() for modified UTF-8 (CASSANDRA-10748) + * Clarify that now() function is calculated on the coordinator node in CQL documentation (CASSANDRA-10900) + * Fix bloom filter sizing with LCS (CASSANDRA-11344) + * (cqlsh) Fix error when result is 0 rows with EXPAND ON (CASSANDRA-11092) + * Fix intra-node serialization issue for multicolumn-restrictions (CASSANDRA-11196) + * Non-obsoleting compaction operations over compressed files can impose rate limit on normal reads (CASSANDRA-11301) + * Add missing newline at end of bin/cqlsh (CASSANDRA-11325) + * Fix AE in nodetool cfstats (backport CASSANDRA-10859) (CASSANDRA-11297) + * Unresolved hostname leads to replace being ignored (CASSANDRA-11210) + * Fix filtering on non-primary key columns for thrift static column families + (CASSANDRA-6377) + * Only log yaml config once, at startup (CASSANDRA-11217) + * Preserve order for preferred SSL cipher suites (CASSANDRA-11164) + * Reference leak with parallel repairs on the same table (CASSANDRA-11215) + * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216) + * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167) + * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840) + * (cqlsh) cqlsh cannot be called through symlink (CASSANDRA-11037) + * fix ohc and java-driver pom dependencies in build.xml (CASSANDRA-10793) + * Protect from keyspace dropped during repair (CASSANDRA-11065) + * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146) + * Better error message for cleanup (CASSANDRA-10991) + * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123) + * Use cloned TokenMetadata in size estimates to avoid race against membership check + (CASSANDRA-10736) + * Always persist upsampled index summaries (CASSANDRA-10512) + * (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733) + * Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048) + * Fix SELECT on tuple relations for mixed ASC/DESC clustering order (CASSANDRA-7281) + * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030) + * Fix paging on DISTINCT queries repeats result when first row in partition changes + (CASSANDRA-10010) +Merged from 2.1: ++ * Add a -j parameter to scrub/cleanup/upgradesstables to state how ++ many threads to use (CASSANDRA-11179) * Backport CASSANDRA-10679 (CASSANDRA-9598) * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342) * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286) http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index da4a84a,3d66d3a..09f58ac --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1499,74 -1478,22 +1499,74 @@@ public class ColumnFamilyStore implemen return maxFile; } - public CompactionManager.AllSSTableOpStatus forceCleanup() throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus forceCleanup(int jobs) throws ExecutionException, InterruptedException { - return CompactionManager.instance.performCleanup(ColumnFamilyStore.this); + return CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs); } - public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs) throws ExecutionException, InterruptedException { - return scrub(disableSnapshot, skipCorrupted, false, checkData); ++ return scrub(disableSnapshot, skipCorrupted, false, checkData, jobs); + } + + @VisibleForTesting - public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData) throws ExecutionException, InterruptedException ++ public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData, int jobs) throws ExecutionException, InterruptedException + { // skip snapshot creation during scrub, SEE JIRA 5891 if(!disableSnapshot) snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis()); - return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, jobs); + + try + { - return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData); ++ return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, jobs); + } + catch(Throwable t) + { + if (!rebuildOnFailedScrub(t)) + throw t; + + return alwaysFail ? CompactionManager.AllSSTableOpStatus.ABORTED : CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + } + } + + /** + * CASSANDRA-5174 : For an index cfs we may be able to discard everything and just rebuild + * the index when a scrub fails. + * + * @return true if we are an index cfs and we successfully rebuilt the index + */ + public boolean rebuildOnFailedScrub(Throwable failure) + { + if (!isIndex()) + return false; + + SecondaryIndex index = null; + if (metadata.cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR)) + { + String[] parts = metadata.cfName.split("\\" + Directories.SECONDARY_INDEX_NAME_SEPARATOR, 2); + ColumnFamilyStore parentCfs = keyspace.getColumnFamilyStore(parts[0]); + index = parentCfs.indexManager.getIndexByName(metadata.cfName); + assert index != null; + } + + if (index == null) + return false; + + truncateBlocking(); + + logger.warn("Rebuilding index for {} because of <{}>", name, failure.getMessage()); + index.getBaseCfs().rebuildSecondaryIndex(index.getIndexName()); + return true; + } + + public CompactionManager.AllSSTableOpStatus verify(boolean extendedVerify) throws ExecutionException, InterruptedException + { + return CompactionManager.instance.performVerify(ColumnFamilyStore.this, extendedVerify); } - public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException { - return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); + return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs); } public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index b015bcd,e382cab..ca02747 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -263,21 -271,37 +263,31 @@@ public class CompactionManager implemen } } + /** + * Run an operation over all sstables using jobs threads + * + * @param cfs the column family store to run the operation on + * @param operation the operation to run + * @param jobs the number of threads to use - 0 means use all available. It never uses more than concurrent_compactors threads + * @return status of the operation + * @throws ExecutionException + * @throws InterruptedException + */ - private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs) throws ExecutionException, InterruptedException + @SuppressWarnings("resource") - private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException ++ private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) throws ExecutionException, InterruptedException { - Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting(); - if (compactingSSTables == null) + List<LifecycleTransaction> transactions = new ArrayList<>(); + try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType)) { - logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.ABORTED; - } - if (Iterables.isEmpty(compactingSSTables)) - { - logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.SUCCESSFUL; - } - Set<SSTableReader> sstables = Sets.newHashSet(operation.filterSSTables(compactingSSTables)); - Set<SSTableReader> filteredAway = Sets.difference(Sets.newHashSet(compactingSSTables), sstables); - cfs.getDataTracker().unmarkCompacting(filteredAway); - final Set<SSTableReader> finished = Sets.newConcurrentHashSet(); + Iterable<SSTableReader> sstables = compacting != null ? Lists.newArrayList(operation.filterSSTables(compacting)) : Collections.<SSTableReader>emptyList(); + if (Iterables.isEmpty(sstables)) + { + logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.SUCCESSFUL; + } + + List<Future<Object>> futures = new ArrayList<>(); - List<Future<Object>> futures = new ArrayList<>(); - try - { for (final SSTableReader sstable : sstables) { if (executor.isShutdown()) @@@ -286,23 -310,31 +296,27 @@@ return AllSSTableOpStatus.ABORTED; } + final LifecycleTransaction txn = compacting.split(singleton(sstable)); + transactions.add(txn); - futures.add(executor.submit(new Callable<Object>() + Callable<Object> callable = new Callable<Object>() { @Override public Object call() throws Exception { - try - { - operation.execute(sstable); - } - finally - { - cfs.getDataTracker().unmarkCompacting(Collections.singleton(sstable)); - finished.add(sstable); - } + operation.execute(txn); return this; } - })); + }; + futures.add(executor.submit(callable)); + if (jobs > 0 && futures.size() == jobs) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); + } } - - assert compacting.originals().isEmpty(); - FBUtilities.waitOnFutures(futures); ++ assert compacting.originals().isEmpty(); + return AllSSTableOpStatus.SUCCESSFUL; } finally { @@@ -327,9 -358,9 +341,9 @@@ } } - public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData) - public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs) throws InterruptedException, ExecutionException ++ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs) + throws InterruptedException, ExecutionException { - assert !cfs.isIndex(); return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @Override @@@ -343,29 -374,10 +357,29 @@@ { scrubOne(cfs, input, skipCorrupted, checkData); } - }, OperationType.SCRUB); - }, jobs); ++ }, jobs, OperationType.SCRUB); + } + + public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException + { + assert !cfs.isIndex(); + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input) + { + return input.originals(); + } + + @Override + public void execute(LifecycleTransaction input) throws IOException + { + verifyOne(cfs, input.onlyOne(), extendedVerify); + } - }, OperationType.VERIFY); ++ }, 0, OperationType.VERIFY); } - public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException { return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @@@ -394,10 -402,10 +408,10 @@@ task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(metrics); } - }, OperationType.UPGRADE_SSTABLES); - }, jobs); ++ }, jobs, OperationType.UPGRADE_SSTABLES); } - public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jobs) throws InterruptedException, ExecutionException { assert !cfStore.isIndex(); Keyspace keyspace = cfStore.keyspace; @@@ -425,12 -428,12 +439,12 @@@ } @Override - public void execute(SSTableReader input) throws IOException + public void execute(LifecycleTransaction txn) throws IOException { CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges); - doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes); + doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes); } - }, OperationType.CLEANUP); - }, jobs); ++ }, jobs, OperationType.CLEANUP); } public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 5d29a5a,507aedb..bca5996 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2555,7 -2385,12 +2555,12 @@@ public class StorageService extends Not public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + return forceKeyspaceCleanup(0, keyspaceName, columnFamilies); + } + + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { - if (keyspaceName.equals(Keyspace.SYSTEM_KS)) + if (keyspaceName.equals(SystemKeyspace.NAME)) throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise"); CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; @@@ -2575,30 -2410,27 +2580,39 @@@ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies); + } + + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; - for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies)) { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs); if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) status = oneStatus; } return status.statusCode; } - + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) + { + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.verify(extendedVerify); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status.statusCode; + } + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + return upgradeSSTables(keyspaceName, excludeCurrentVersion, 2, columnFamilies); + } + + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, columnFamilies)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index 7c5cd0a,d3a1725..761eed6 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -261,17 -272,11 +263,19 @@@ public interface StorageServiceMBean ex */ @Deprecated public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + @Deprecated public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** + * Verify (checksums of) the given keyspace. + * If columnFamilies array is empty, all CFs are verified. + * + * The entire sstable will be read to ensure each cell validates if extendedVerify is true + */ + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + + /** * Rewrite all sstables to the latest version. * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables first. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index 93804a8,ab08e9f..2f27cea --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -53,6 -62,8 +53,7 @@@ import javax.management.remote.JMXConne import javax.management.remote.JMXServiceURL; import javax.rmi.ssl.SslRMIClientSocketFactory; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.HintedHandOffManager; import org.apache.cassandra.db.HintedHandOffManagerMBean; @@@ -230,60 -238,53 +231,69 @@@ public class NodeProbe implements AutoC jmxc.close(); } - public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies); + return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies); } - public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies); + return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies); } + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies); + } + - public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); + return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies); } - public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + private void checkJobs(PrintStream out, int jobs) { - if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0) + if (jobs > DatabaseDescriptor.getConcurrentCompactors()) + out.println(String.format("jobs (%d) is bigger than configured concurrent_compactors (%d), using at most %d threads", jobs, DatabaseDescriptor.getConcurrentCompactors(), DatabaseDescriptor.getConcurrentCompactors())); + } + + public void forceKeyspaceCleanup(PrintStream out, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { ++ checkJobs(out, jobs); + if (forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies) != 0) { failed = true; - out.println("Aborted cleaning up atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + out.println("Aborted cleaning up at least one table in keyspace "+keyspaceName+", check server logs for more information."); } } - public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies) != 0) + checkJobs(out, jobs); + if (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies) != 0) { failed = true; - out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information."); + } + } + + public void verify(PrintStream out, boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + if (verify(extendedVerify, keyspaceName, columnFamilies) != 0) + { + failed = true; + out.println("Aborted verifying at least one table in keyspace "+keyspaceName+", check server logs for more information."); } } + - public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - if (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies) != 0) + checkJobs(out, jobs); + if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies) != 0) { failed = true; - out.println("Aborted upgrading sstables for atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + out.println("Aborted upgrading sstables for atleast one table in keyspace "+keyspaceName+", check server logs for more information."); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/nodetool/Cleanup.java index aa415b3,0000000..6c6676d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java @@@ -1,56 -1,0 +1,62 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; + +import java.util.ArrayList; +import java.util.List; + ++import io.airlift.command.Option; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "cleanup", description = "Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces") +public class Cleanup extends NodeToolCmd +{ + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> args = new ArrayList<>(); + ++ @Option(title = "jobs", ++ name = {"-j", "--jobs"}, ++ description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction threads") ++ private int jobs = 2; ++ + @Override + public void execute(NodeProbe probe) + { + List<String> keyspaces = parseOptionalKeyspace(args, probe); + String[] cfnames = parseOptionalColumnFamilies(args); + + for (String keyspace : keyspaces) + { + if (SystemKeyspace.NAME.equals(keyspace)) + continue; + + try + { - probe.forceKeyspaceCleanup(System.out, keyspace, cfnames); ++ probe.forceKeyspaceCleanup(System.out, jobs, keyspace, cfnames); + } catch (Exception e) + { + throw new RuntimeException("Error occurred during cleanup", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/nodetool/Scrub.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/nodetool/Scrub.java index 54f981e,0000000..dafe8d1 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Scrub.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java @@@ -1,71 -1,0 +1,76 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import io.airlift.command.Option; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more tables") +public class Scrub extends NodeToolCmd +{ + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> args = new ArrayList<>(); + + @Option(title = "disable_snapshot", + name = {"-ns", "--no-snapshot"}, + description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)") + private boolean disableSnapshot = false; + + @Option(title = "skip_corrupted", + name = {"-s", "--skip-corrupted"}, + description = "Skip corrupted partitions even when scrubbing counter tables. (default false)") + private boolean skipCorrupted = false; + + @Option(title = "no_validate", + name = {"-n", "--no-validate"}, + description = "Do not validate columns using column validator") + private boolean noValidation = false; + ++ @Option(title = "jobs", ++ name = {"-j", "--jobs"}, ++ description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads") ++ private int jobs = 2; ++ + @Override + public void execute(NodeProbe probe) + { + List<String> keyspaces = parseOptionalKeyspace(args, probe); + String[] cfnames = parseOptionalColumnFamilies(args); + + for (String keyspace : keyspaces) + { + try + { - probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, keyspace, cfnames); ++ probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, jobs, keyspace, cfnames); + } catch (IllegalArgumentException e) + { + throw e; + } catch (Exception e) + { + throw new RuntimeException("Error occurred during scrubbing", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java index 86a2cd5,0000000..596f353 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java +++ b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java @@@ -1,56 -1,0 +1,61 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import io.airlift.command.Option; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "upgradesstables", description = "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version)") +public class UpgradeSSTable extends NodeToolCmd +{ + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> args = new ArrayList<>(); + + @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version") + private boolean includeAll = false; + ++ @Option(title = "jobs", ++ name = {"-j", "--jobs"}, ++ description = "Number of sstables to upgrade simultanously, set to 0 to use all available compaction threads") ++ private int jobs = 2; ++ + @Override + public void execute(NodeProbe probe) + { + List<String> keyspaces = parseOptionalKeyspace(args, probe); + String[] cfnames = parseOptionalColumnFamilies(args); + + for (String keyspace : keyspaces) + { + try + { - probe.upgradeSSTables(System.out, keyspace, !includeAll, cfnames); ++ probe.upgradeSSTables(System.out, keyspace, !includeAll, jobs, cfnames); + } catch (Exception e) + { + throw new RuntimeException("Error occurred during enabling auto-compaction", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/test/unit/org/apache/cassandra/db/CleanupTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index c0cde40,4efd082..4cca7ff --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -588,14 -499,14 +588,14 @@@ public class ScrubTes @Test public void testScrubColumnValidation() throws InterruptedException, RequestExecutionException, ExecutionException { - QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_compact_static_columns (a bigint, b timeuuid, c boolean static, d text, PRIMARY KEY (a, b))", ConsistencyLevel.ONE); + QueryProcessor.process(String.format("CREATE TABLE \"%s\".test_compact_static_columns (a bigint, b timeuuid, c boolean static, d text, PRIMARY KEY (a, b))", KEYSPACE), ConsistencyLevel.ONE); - Keyspace keyspace = Keyspace.open("Keyspace1"); + Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_static_columns"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')"); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')", KEYSPACE)); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_scrub_validation (a text primary key, b int)", ConsistencyLevel.ONE); ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("test_scrub_validation"); @@@ -614,15 -525,15 +614,15 @@@ @Test public void testColumnNameEqualToDefaultKeyAlias() throws ExecutionException, InterruptedException { - Keyspace keyspace = Keyspace.open("Keyspace1"); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("UUIDKeys"); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_UUID); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "UUIDKeys"); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, CF_UUID); cf.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 1L)); - Mutation mutation = new Mutation("Keyspace1", ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf); + Mutation mutation = new Mutation(KEYSPACE, ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf); mutation.applyUnsafe(); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, false, true); + CompactionManager.instance.performScrub(cfs, false, true, 2); assertEquals(1, cfs.getSSTables().size()); } @@@ -634,19 -545,19 +634,19 @@@ @Test public void testValidationCompactStorage() throws Exception { - QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_compact_dynamic_columns (a int, b text, c text, PRIMARY KEY (a, b)) WITH COMPACT STORAGE", ConsistencyLevel.ONE); + QueryProcessor.process(String.format("CREATE TABLE \"%s\".test_compact_dynamic_columns (a int, b text, c text, PRIMARY KEY (a, b)) WITH COMPACT STORAGE", KEYSPACE), ConsistencyLevel.ONE); - Keyspace keyspace = Keyspace.open("Keyspace1"); + Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_dynamic_columns"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')"); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')", KEYSPACE)); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')", KEYSPACE)); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')", KEYSPACE)); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, true, true); + CompactionManager.instance.performScrub(cfs, true, true, 2); // Scrub is silent, but it will remove broken records. So reading everything back to make sure nothing to "scrubbed away" - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns"); + UntypedResultSet rs = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".test_compact_dynamic_columns", KEYSPACE)); assertEquals(3, rs.size()); Iterator<UntypedResultSet.Row> iter = rs.iterator(); @@@ -654,129 -565,4 +654,129 @@@ assertEquals("bar", iter.next().getString("c")); assertEquals("boo", iter.next().getString("c")); } + + @Test /* CASSANDRA-5174 */ + public void testScrubKeysIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException + { + //If the partitioner preserves the order then SecondaryIndex uses BytesType comparator, + // otherwise it uses LocalByPartitionerType + setKeyComparator(BytesType.instance); + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubCompositeIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(BytesType.instance); + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubKeysIndex() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner())); + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner())); + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true); + } + + @Test /* CASSANDRA-5174 */ + public void testFailScrubKeysIndex() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, false); + } + + @Test /* CASSANDRA-5174 */ + public void testFailScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, false); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubTwice() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true, true); + } + + /** The SecondaryIndex class is used for custom indexes so to avoid + * making a public final field into a private field with getters + * and setters, we resort to this hack in order to test it properly + * since it can have two values which influence the scrubbing behavior. + * @param comparator - the key comparator we want to test + */ + private void setKeyComparator(AbstractType<?> comparator) + { + try + { + Field keyComparator = SecondaryIndex.class.getDeclaredField("keyComparator"); + keyComparator.setAccessible(true); + int modifiers = keyComparator.getModifiers(); + Field modifierField = keyComparator.getClass().getDeclaredField("modifiers"); + modifiers = modifiers & ~Modifier.FINAL; + modifierField.setAccessible(true); + modifierField.setInt(keyComparator, modifiers); + + keyComparator.set(null, comparator); + } + catch (Exception ex) + { + fail("Failed to change key comparator in secondary index : " + ex.getMessage()); + ex.printStackTrace(); + } + } + + private void testScrubIndex(String cfName, String colName, boolean composite, boolean ... scrubs) + throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName); + cfs.clearUnsafe(); + + int numRows = 1000; + long[] colValues = new long [numRows * 2]; // each row has two columns + for (int i = 0; i < colValues.length; i+=2) + { + colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column + colValues[i+1] = 3L; //other column + } + fillIndexCF(cfs, composite, colValues); + + // check index + IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes(colName), Operator.EQ, ByteBufferUtil.bytes(1L)); + List<Row> rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows); + assertNotNull(rows); + assertEquals(numRows / 2, rows.size()); + + // scrub index + Set<ColumnFamilyStore> indexCfss = cfs.indexManager.getIndexesBackedByCfs(); + assertTrue(indexCfss.size() == 1); + for(ColumnFamilyStore indexCfs : indexCfss) + { + for (int i = 0; i < scrubs.length; i++) + { + boolean failure = !scrubs[i]; + if (failure) + { //make sure the next scrub fails + overrideWithGarbage(indexCfs.getSSTables().iterator().next(), ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L)); + } - CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true, true); ++ CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true, true, 0); + assertEquals(failure ? + CompactionManager.AllSSTableOpStatus.ABORTED : + CompactionManager.AllSSTableOpStatus.SUCCESSFUL, + result); + } + } + + + // check index is still working + rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows); + assertNotNull(rows); + assertEquals(numRows / 2, rows.size()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ----------------------------------------------------------------------