Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bba0d03e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bba0d03e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bba0d03e Branch: refs/heads/cassandra-3.11 Commit: bba0d03e9c5e62c222734839a9adc83f1aec6f95 Parents: ea62d88 489c2f6 Author: Mick Semb Wever <m...@apache.org> Authored: Fri Jun 29 16:58:26 2018 +1000 Committer: Mick Semb Wever <m...@apache.org> Committed: Fri Jun 29 17:00:02 2018 +1000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 67 +++++++----- .../db/compaction/AntiCompactionTest.java | 109 ++++++++++++++++++- 3 files changed, 147 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index f0a4de5,f033bf2..fa6b03e --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -478,115 -474,17 +477,126 @@@ public class CompactionManager implemen }, jobs, OperationType.CLEANUP); } + public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException + { + assert !cfStore.isIndex(); + + return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) + { + Iterable<SSTableReader> originals = transaction.originals(); + if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones()) + originals = Iterables.filter(originals, SSTableReader::isRepaired); + List<SSTableReader> sortedSSTables = Lists.newArrayList(originals); + Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator); + return sortedSSTables; + } + + @Override + public void execute(LifecycleTransaction txn) throws IOException + { + logger.debug("Garbage collecting {}", txn.originals()); + CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())) + { + @Override + protected CompactionController getCompactionController(Set<SSTableReader> toCompact) + { + return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption); + } + }; + task.setUserDefined(true); + task.setCompactionType(OperationType.GARBAGE_COLLECT); + task.execute(metrics); + } + }, jobs, OperationType.GARBAGE_COLLECT); + } + + public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException + { + if (!cfs.getPartitioner().splitter().isPresent()) + { + logger.info("Partitioner does not support splitting"); + return AllSSTableOpStatus.ABORTED; + } + final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName()); + + if (r.isEmpty()) + { + logger.info("Relocate cannot run before a node has joined the ring"); + return AllSSTableOpStatus.ABORTED; + } + + final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries(); + + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) + { + Set<SSTableReader> originals = Sets.newHashSet(transaction.originals()); + Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet()); + transaction.cancel(Sets.difference(originals, needsRelocation)); + + Map<Integer, List<SSTableReader>> groupedByDisk = groupByDiskIndex(needsRelocation); + + int maxSize = 0; + for (List<SSTableReader> diskSSTables : groupedByDisk.values()) + maxSize = Math.max(maxSize, diskSSTables.size()); + + List<SSTableReader> mixedSSTables = new ArrayList<>(); + + for (int i = 0; i < maxSize; i++) + for (List<SSTableReader> diskSSTables : groupedByDisk.values()) + if (i < diskSSTables.size()) + mixedSSTables.add(diskSSTables.get(i)); + + return mixedSSTables; + } + + public Map<Integer, List<SSTableReader>> groupByDiskIndex(Set<SSTableReader> needsRelocation) + { + return needsRelocation.stream().collect(Collectors.groupingBy((s) -> diskBoundaries.getDiskIndex(s))); + } + + private boolean inCorrectLocation(SSTableReader sstable) + { + if (!cfs.getPartitioner().splitter().isPresent()) + return true; + + int diskIndex = diskBoundaries.getDiskIndex(sstable); + File diskLocation = diskBoundaries.directories.get(diskIndex).location; + PartitionPosition diskLast = diskBoundaries.positions.get(diskIndex); + + // the location we get from directoryIndex is based on the first key in the sstable + // now we need to make sure the last key is less than the boundary as well: + return sstable.descriptor.directory.getAbsolutePath().startsWith(diskLocation.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0; + } + + @Override + public void execute(LifecycleTransaction txn) + { + logger.debug("Relocating {}", txn.originals()); + AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); + task.setUserDefined(true); + task.setCompactionType(OperationType.RELOCATE); + task.execute(metrics); + } + }, jobs, OperationType.RELOCATE); + } + + /** + * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables + * as repaired. + * + * @param cfs Column family for anti-compaction + * @param ranges Repaired ranges to be anti-compacted into separate SSTables. + * @param sstables {@link Refs} of SSTables within CF to anti-compact. + * @param repairedAt Unix timestamp of when repair was completed. + * @param parentRepairSession Corresponding repair session + * @return Futures executing anti-compaction. + */ public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, final Collection<Range<Token>> ranges, final Refs<SSTableReader> sstables, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org