Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bba0d03e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bba0d03e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bba0d03e

Branch: refs/heads/cassandra-3.11
Commit: bba0d03e9c5e62c222734839a9adc83f1aec6f95
Parents: ea62d88 489c2f6
Author: Mick Semb Wever <m...@apache.org>
Authored: Fri Jun 29 16:58:26 2018 +1000
Committer: Mick Semb Wever <m...@apache.org>
Committed: Fri Jun 29 17:00:02 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  67 +++++++-----
 .../db/compaction/AntiCompactionTest.java       | 109 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f0a4de5,f033bf2..fa6b03e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -478,115 -474,17 +477,126 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
 +    public AllSSTableOpStatus performGarbageCollection(final 
ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws 
InterruptedException, ExecutionException
 +    {
 +        assert !cfStore.isIndex();
 +
 +        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Iterable<SSTableReader> originals = transaction.originals();
 +                if 
(cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
 +                    originals = Iterables.filter(originals, 
SSTableReader::isRepaired);
 +                List<SSTableReader> sortedSSTables = 
Lists.newArrayList(originals);
 +                Collections.sort(sortedSSTables, 
SSTableReader.maxTimestampComparator);
 +                return sortedSSTables;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn) throws IOException
 +            {
 +                logger.debug("Garbage collecting {}", txn.originals());
 +                CompactionTask task = new CompactionTask(cfStore, txn, 
getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
 +                {
 +                    @Override
 +                    protected CompactionController 
getCompactionController(Set<SSTableReader> toCompact)
 +                    {
 +                        return new CompactionController(cfStore, toCompact, 
gcBefore, null, tombstoneOption);
 +                    }
 +                };
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.GARBAGE_COLLECT);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.GARBAGE_COLLECT);
 +    }
 +
 +    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, 
int jobs) throws ExecutionException, InterruptedException
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
 +        {
 +            logger.info("Partitioner does not support splitting");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +        final Collection<Range<Token>> r = 
StorageService.instance.getLocalRanges(cfs.keyspace.getName());
 +
 +        if (r.isEmpty())
 +        {
 +            logger.info("Relocate cannot run before a node has joined the 
ring");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +
 +        final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
 +
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Set<SSTableReader> originals = 
Sets.newHashSet(transaction.originals());
 +                Set<SSTableReader> needsRelocation = 
originals.stream().filter(s -> 
!inCorrectLocation(s)).collect(Collectors.toSet());
 +                transaction.cancel(Sets.difference(originals, 
needsRelocation));
 +
 +                Map<Integer, List<SSTableReader>> groupedByDisk = 
groupByDiskIndex(needsRelocation);
 +
 +                int maxSize = 0;
 +                for (List<SSTableReader> diskSSTables : 
groupedByDisk.values())
 +                    maxSize = Math.max(maxSize, diskSSTables.size());
 +
 +                List<SSTableReader> mixedSSTables = new ArrayList<>();
 +
 +                for (int i = 0; i < maxSize; i++)
 +                    for (List<SSTableReader> diskSSTables : 
groupedByDisk.values())
 +                        if (i < diskSSTables.size())
 +                            mixedSSTables.add(diskSSTables.get(i));
 +
 +                return mixedSSTables;
 +            }
 +
 +            public Map<Integer, List<SSTableReader>> 
groupByDiskIndex(Set<SSTableReader> needsRelocation)
 +            {
 +                return 
needsRelocation.stream().collect(Collectors.groupingBy((s) -> 
diskBoundaries.getDiskIndex(s)));
 +            }
 +
 +            private boolean inCorrectLocation(SSTableReader sstable)
 +            {
 +                if (!cfs.getPartitioner().splitter().isPresent())
 +                    return true;
 +
 +                int diskIndex = diskBoundaries.getDiskIndex(sstable);
 +                File diskLocation = 
diskBoundaries.directories.get(diskIndex).location;
 +                PartitionPosition diskLast = 
diskBoundaries.positions.get(diskIndex);
 +
 +                // the location we get from directoryIndex is based on the 
first key in the sstable
 +                // now we need to make sure the last key is less than the 
boundary as well:
 +                return 
sstable.descriptor.directory.getAbsolutePath().startsWith(diskLocation.getAbsolutePath())
 && sstable.last.compareTo(diskLast) <= 0;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn)
 +            {
 +                logger.debug("Relocating {}", txn.originals());
 +                AbstractCompactionTask task = 
cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, 
Long.MAX_VALUE);
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.RELOCATE);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.RELOCATE);
 +    }
 +
+     /**
+      * Submit anti-compactions for a collection of SSTables over a set of 
repaired ranges and marks corresponding SSTables
+      * as repaired.
+      *
+      * @param cfs Column family for anti-compaction
+      * @param ranges Repaired ranges to be anti-compacted into separate 
SSTables.
+      * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+      * @param repairedAt Unix timestamp of when repair was completed.
+      * @param parentRepairSession Corresponding repair session
+      * @return Futures executing anti-compaction.
+      */
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore 
cfs,
                                            final Collection<Range<Token>> 
ranges,
                                            final Refs<SSTableReader> sstables,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to