Merge branch 'cassandra-3.0' into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a250854 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a250854 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a250854 Branch: refs/heads/trunk Commit: 0a2508544069f035dc43a81b8700c0b7bf2d28e3 Parents: c7ef7c9 0c91977 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Mar 29 11:05:46 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Mar 29 11:05:46 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/ColumnFamilyStore.java | 20 ++++----- .../db/compaction/CompactionManager.java | 44 +++++++++++++------- .../cassandra/service/StorageService.java | 30 ++++++++++--- .../cassandra/service/StorageServiceMBean.java | 8 ++++ .../org/apache/cassandra/tools/NodeProbe.java | 38 ++++++++++------- .../cassandra/tools/nodetool/Cleanup.java | 9 +++- .../tools/nodetool/RelocateSSTables.java | 8 +++- .../apache/cassandra/tools/nodetool/Scrub.java | 13 ++++-- .../tools/nodetool/UpgradeSSTable.java | 10 ++++- .../org/apache/cassandra/db/CleanupTest.java | 6 +-- .../unit/org/apache/cassandra/db/ScrubTest.java | 16 +++---- 12 files changed, 141 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 2907df9,fdc873e..3e69959 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -25,43 -23,14 +25,45 @@@ Merged from 2.2 * Only log yaml config once, at startup (CASSANDRA-11217) * Reference leak with parallel repairs on the same table (CASSANDRA-11215) Merged from 2.1: + * Add a -j parameter to scrub/cleanup/upgradesstables to state how + many threads to use (CASSANDRA-11179) - * Backport CASSANDRA-10679 (CASSANDRA-9598) - * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176) * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053) + * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176) -3.0.4 - * Preserve order for preferred SSL cipher suites (CASSANDRA-11164) + +3.4 + * (cqlsh) add cqlshrc option to always connect using ssl (CASSANDRA-10458) + * Cleanup a few resource warnings (CASSANDRA-11085) + * Allow custom tracing implementations (CASSANDRA-10392) + * Extract LoaderOptions to be able to be used from outside (CASSANDRA-10637) + * fix OnDiskIndexTest to properly treat empty ranges (CASSANDRA-11205) + * fix TrackerTest to handle new notifications (CASSANDRA-11178) + * add SASI validation for partitioner and complex columns (CASSANDRA-11169) + * Add caching of encrypted credentials in PasswordAuthenticator (CASSANDRA-7715) + * fix SASI memtable switching on flush (CASSANDRA-11159) + * Remove duplicate offline compaction tracking (CASSANDRA-11148) + * fix EQ semantics of analyzed SASI indexes (CASSANDRA-11130) + * Support long name output for nodetool commands (CASSANDRA-7950) + * Encrypted hints (CASSANDRA-11040) + * SASI index options validation (CASSANDRA-11136) + * Optimize disk seek using min/max column name meta data when the LIMIT clause is used + (CASSANDRA-8180) + * Add LIKE support to CQL3 (CASSANDRA-11067) + * Generic Java UDF types (CASSANDRA-10819) + * cqlsh: Include sub-second precision in timestamps by default (CASSANDRA-10428) + * Set javac encoding to utf-8 (CASSANDRA-11077) + * Integrate SASI index into Cassandra (CASSANDRA-10661) + * Add --skip-flush option to nodetool snapshot + * Skip values for non-queried columns (CASSANDRA-10657) + * Add support for secondary indexes on static columns (CASSANDRA-8103) + * CommitLogUpgradeTestMaker creates broken commit logs (CASSANDRA-11051) + * Add metric for number of dropped mutations (CASSANDRA-10866) + * Simplify row cache invalidation code (CASSANDRA-10396) + * Support user-defined compaction through nodetool (CASSANDRA-10660) + * Stripe view locks by key and table ID to reduce contention (CASSANDRA-10981) + * Add nodetool gettimeout and settimeout commands (CASSANDRA-10953) + * Add 3.0 metadata to sstablemetadata output (CASSANDRA-10838) +Merged from 3.0: * MV should only query complex columns included in the view (CASSANDRA-11069) * Failed aggregate creation breaks server permanently (CASSANDRA-11064) * Add sstabledump tool (CASSANDRA-7464) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index fbfb7ee,a1db5b5..f193c4d --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1436,16 -1363,11 +1436,16 @@@ public class ColumnFamilyStore implemen return CompactionManager.instance.performVerify(ColumnFamilyStore.this, extendedVerify); } - public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException { - return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); + return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs); } - public CompactionManager.AllSSTableOpStatus relocateSSTables() throws ExecutionException, InterruptedException ++ public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) throws ExecutionException, InterruptedException + { - return CompactionManager.instance.relocateSSTables(this); ++ return CompactionManager.instance.relocateSSTables(this, jobs); + } + public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) { assert !sstables.isEmpty(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 627ae32,f339660..f9b0997 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -438,80 -451,9 +452,80 @@@ public class CompactionManager implemen CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges, FBUtilities.nowInSeconds()); doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes); } - }, OperationType.CLEANUP); + }, jobs, OperationType.CLEANUP); } - public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs) throws ExecutionException, InterruptedException ++ public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException + { + if (!cfs.getPartitioner().splitter().isPresent()) + { + logger.info("Partitioner does not support splitting"); + return AllSSTableOpStatus.ABORTED; + } + final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName()); + + if (r.isEmpty()) + { + logger.info("Relocate cannot run before a node has joined the ring"); + return AllSSTableOpStatus.ABORTED; + } + + final List<Range<Token>> localRanges = Range.sort(r); + final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations(); + final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations); + + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) + { + Set<SSTableReader> originals = Sets.newHashSet(transaction.originals()); + Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet()); + transaction.cancel(Sets.difference(originals, needsRelocation)); + + Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) -> + CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s))); + + int maxSize = 0; + for (List<SSTableReader> diskSSTables : groupedByDisk.values()) + maxSize = Math.max(maxSize, diskSSTables.size()); + + List<SSTableReader> mixedSSTables = new ArrayList<>(); + + for (int i = 0; i < maxSize; i++) + for (List<SSTableReader> diskSSTables : groupedByDisk.values()) + if (i < diskSSTables.size()) + mixedSSTables.add(diskSSTables.get(i)); + + return mixedSSTables; + } + + private boolean inCorrectLocation(SSTableReader sstable) + { + if (!cfs.getPartitioner().splitter().isPresent()) + return true; + int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable); + Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations(); + + Directories.DataDirectory location = locations[directoryIndex]; + PartitionPosition diskLast = diskBoundaries.get(directoryIndex); + // the location we get from directoryIndex is based on the first key in the sstable + // now we need to make sure the last key is less than the boundary as well: + return sstable.descriptor.directory.getAbsolutePath().startsWith(location.location.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0; + } + + @Override + public void execute(LifecycleTransaction txn) throws IOException + { + logger.debug("Relocating {}", txn.originals()); + AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); + task.setUserDefined(true); + task.setCompactionType(OperationType.RELOCATE); + task.execute(metrics); + } - }, OperationType.RELOCATE); ++ }, jobs, OperationType.RELOCATE); + } + public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, final Collection<Range<Token>> ranges, final Refs<SSTableReader> sstables, http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 22d67e5,75573ac..52dcb97 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2789,59 -2712,6 +2804,64 @@@ public class StorageService extends Not } } + public int relocateSSTables(String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException + { ++ return relocateSSTables(0, keyspaceName, columnFamilies); ++ } ++ ++ public int relocateSSTables(int jobs, String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException ++ { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) + { - CompactionManager.AllSSTableOpStatus oneStatus = cfs.relocateSSTables(); ++ CompactionManager.AllSSTableOpStatus oneStatus = cfs.relocateSSTables(jobs); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status.statusCode; + } + + /** + * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified. + * + * @param tag + * the tag given to the snapshot; may not be null or empty + * @param options + * Map of options (skipFlush is the only supported option for now) + * @param entities + * list of keyspaces / tables in the form of empty | ks1 ks2 ... | ks1.cf1,ks2.cf2,... + */ + @Override + public void takeSnapshot(String tag, Map<String, String> options, String... entities) throws IOException + { + boolean skipFlush = Boolean.parseBoolean(options.getOrDefault("skipFlush", "false")); + + if (entities != null && entities.length > 0 && entities[0].contains(".")) + { + takeMultipleTableSnapshot(tag, skipFlush, entities); + } + else + { + takeSnapshot(tag, skipFlush, entities); + } + } + + /** + * Takes the snapshot of a specific table. A snapshot name must be + * specified. + * + * @param keyspaceName + * the keyspace which holds the specified table + * @param tableName + * the table to snapshot + * @param tag + * the tag given to the snapshot; may not be null or empty + */ + public void takeTableSnapshot(String keyspaceName, String tableName, String tag) + throws IOException { + takeMultipleTableSnapshot(tag, false, keyspaceName + "." + tableName); + } + /** * Takes the snapshot for the given keyspaces. A snapshot name must be specified. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index dc12253,5b7331d..2238f33 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -251,7 -248,6 +251,9 @@@ public interface StorageServiceMBean ex */ public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; ++ @Deprecated + public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException; ++ public int relocateSSTables(int jobs, String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException; /** * Trigger a cleanup of keys on a single keyspace */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index 405f70c,a9d71d8..7d15926 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -304,11 -309,6 +314,11 @@@ public class NodeProbe implements AutoC ssProxy.forceKeyspaceCompaction(splitOutput, keyspaceName, tableNames); } - public void relocateSSTables(String keyspace, String[] cfnames) throws IOException, ExecutionException, InterruptedException ++ public void relocateSSTables(int jobs, String keyspace, String[] cfnames) throws IOException, ExecutionException, InterruptedException + { - ssProxy.relocateSSTables(keyspace, cfnames); ++ ssProxy.relocateSSTables(jobs, keyspace, cfnames); + } + public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { ssProxy.forceKeyspaceFlush(keyspaceName, tableNames); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java index 8522bc4,0000000..7c3066c mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java +++ b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java @@@ -1,49 -1,0 +1,55 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.List; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; ++import io.airlift.command.Option; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; + +@Command(name = "relocatesstables", description = "Relocates sstables to the correct disk") +public class RelocateSSTables extends NodeTool.NodeToolCmd +{ + @Arguments(usage = "<keyspace> <table>", description = "The keyspace and table name") + private List<String> args = new ArrayList<>(); + ++ @Option(title = "jobs", ++ name = {"-j", "--jobs"}, ++ description = "Number of sstables to relocate simultanously, set to 0 to use all available compaction threads") ++ private int jobs = 2; ++ + @Override + public void execute(NodeProbe probe) + { + List<String> keyspaces = parseOptionalKeyspace(args, probe); + String[] cfnames = parseOptionalTables(args); + try + { + for (String keyspace : keyspaces) - probe.relocateSSTables(keyspace, cfnames); ++ probe.relocateSSTables(jobs, keyspace, cfnames); + } + catch (Exception e) + { + throw new RuntimeException("Got error while relocating", e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/test/unit/org/apache/cassandra/db/CleanupTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/test/unit/org/apache/cassandra/db/ScrubTest.java ----------------------------------------------------------------------