Merge branch 'cassandra-2.2' into trunk

Conflicts:
        
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
        
src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d26187e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d26187e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d26187e5

Branch: refs/heads/trunk
Commit: d26187e5c2ab2cf08d8c874387b4674b860f5e4d
Parents: 3e75d5a 325aeb7
Author: Marcus Eriksson <marc...@apache.org>
Authored: Wed Jul 29 17:04:15 2015 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Jul 29 17:04:15 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compaction/AbstractCompactionStrategy.java  | 11 +----
 .../compaction/CompactionStrategyManager.java   | 49 +++++++++++++-------
 .../DateTieredCompactionStrategy.java           |  5 +-
 .../compaction/LeveledCompactionStrategy.java   |  3 --
 .../SizeTieredCompactionStrategy.java           |  6 ---
 6 files changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 6598286,05dffc8..379d3de
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -85,8 -81,8 +85,6 @@@ public abstract class AbstractCompactio
       */
      protected boolean isActive = false;
  
--    protected volatile boolean enabled = true;
--
      protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
      {
          assert cfs != null;
@@@ -195,19 -191,19 +193,12 @@@
       */
      public abstract long getMaxSSTableBytes();
  
--    public boolean isEnabled()
--    {
--        return this.enabled && this.isActive;
--    }
--
      public void enable()
      {
--        this.enabled = true;
      }
  
      public void disable()
      {
--        this.enabled = false;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 8ec7071,0000000..766eb1b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,448 -1,0 +1,461 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.compaction;
 +
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashSet;
 +import java.util.List;
++import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +
++import com.google.common.collect.ImmutableMap;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.notifications.INotification;
 +import org.apache.cassandra.notifications.INotificationConsumer;
 +import org.apache.cassandra.notifications.SSTableAddedNotification;
 +import org.apache.cassandra.notifications.SSTableDeletingNotification;
 +import org.apache.cassandra.notifications.SSTableListChangedNotification;
 +import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 +
 +/**
 + * Manages the compaction strategies.
 + *
 + * Currently has two instances of actual compaction strategies - one for 
repaired data and one for
 + * unrepaired data. This is done to be able to totally separate the different 
sets of sstables.
 + */
 +public class CompactionStrategyManager implements INotificationConsumer
 +{
 +    protected static final String COMPACTION_ENABLED = "enabled";
 +    private static final Logger logger = 
LoggerFactory.getLogger(CompactionStrategyManager.class);
 +    private final ColumnFamilyStore cfs;
 +    private volatile AbstractCompactionStrategy repaired;
 +    private volatile AbstractCompactionStrategy unrepaired;
 +    private volatile boolean enabled = true;
 +    public boolean isActive = true;
++    private Map<String, String> options;
 +
 +    public CompactionStrategyManager(ColumnFamilyStore cfs)
 +    {
 +        cfs.getTracker().subscribe(this);
 +        logger.debug("{} subscribed to the data tracker.", this);
 +        this.cfs = cfs;
 +        reload(cfs.metadata);
 +        String optionValue = 
cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
 +        enabled = optionValue == null || Boolean.parseBoolean(optionValue);
++        options = ImmutableMap.copyOf(cfs.metadata.compactionStrategyOptions);
 +    }
 +
 +    /**
 +     * Return the next background task
 +     *
 +     * Returns a task for the compaction strategy that needs it the most 
(most estimated remaining tasks)
 +     *
 +     */
 +    public synchronized AbstractCompactionTask getNextBackgroundTask(int 
gcBefore)
 +    {
 +        if (!isEnabled())
 +            return null;
 +
 +        maybeReload(cfs.metadata);
 +
 +        if (repaired.getEstimatedRemainingTasks() > 
unrepaired.getEstimatedRemainingTasks())
 +        {
 +            AbstractCompactionTask repairedTask = 
repaired.getNextBackgroundTask(gcBefore);
 +            if (repairedTask != null)
 +                return repairedTask;
 +            return unrepaired.getNextBackgroundTask(gcBefore);
 +        }
 +        else
 +        {
 +            AbstractCompactionTask unrepairedTask = 
unrepaired.getNextBackgroundTask(gcBefore);
 +            if (unrepairedTask != null)
 +                return unrepairedTask;
 +            return repaired.getNextBackgroundTask(gcBefore);
 +        }
 +    }
 +
-     /**
-      * Disable compaction - used with nodetool disableautocompaction for 
example
-      */
-     public void disable()
-     {
-         enabled = false;
-     }
- 
-     /**
-      * re-enable disabled compaction.
-      */
-     public void enable()
-     {
-         enabled = true;
-     }
- 
 +    public boolean isEnabled()
 +    {
 +        return enabled && isActive;
 +    }
 +
 +    public synchronized void resume()
 +    {
 +        isActive = true;
 +    }
 +
 +    /**
 +     * pause compaction while we cancel all ongoing compactions
 +     *
 +     * Separate call from enable/disable to not have to save the 
enabled-state externally
 +      */
 +    public synchronized void pause()
 +    {
 +        isActive = false;
 +    }
 +
 +
 +    private void startup()
 +    {
 +        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        {
 +            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +        }
 +        repaired.startup();
 +        unrepaired.startup();
 +    }
 +
 +    /**
 +     * return the compaction strategy for the given sstable
 +     *
 +     * returns differently based on the repaired status
 +     * @param sstable
 +     * @return
 +     */
 +    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader 
sstable)
 +    {
 +        if (sstable.isRepaired())
 +            return repaired;
 +        else
 +            return unrepaired;
 +    }
 +
 +    public void shutdown()
 +    {
 +        isActive = false;
 +        repaired.shutdown();
 +        unrepaired.shutdown();
 +    }
 +
 +
 +    public synchronized void maybeReload(CFMetaData metadata)
 +    {
 +        if (repaired != null && 
repaired.getClass().equals(metadata.compactionStrategyClass)
 +                && unrepaired != null && 
unrepaired.getClass().equals(metadata.compactionStrategyClass)
 +                && 
repaired.options.equals(metadata.compactionStrategyOptions) // todo: assumes 
all have the same options
 +                && 
unrepaired.options.equals(metadata.compactionStrategyOptions))
 +            return;
- 
 +        reload(metadata);
 +    }
 +
 +    /**
 +     * Reload the compaction strategies
 +     *
 +     * Called after changing configuration and at startup.
 +     * @param metadata
 +     */
 +    public synchronized void reload(CFMetaData metadata)
 +    {
++        boolean disabledWithJMX = !isEnabled() && shouldBeEnabled();
 +        if (repaired != null)
 +            repaired.shutdown();
 +        if (unrepaired != null)
 +            unrepaired.shutdown();
 +        repaired = metadata.createCompactionStrategyInstance(cfs);
 +        unrepaired = metadata.createCompactionStrategyInstance(cfs);
++        options = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
++        if (disabledWithJMX || !shouldBeEnabled())
++            disable();
++        else
++            enable();
 +        startup();
 +    }
 +
 +    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
 +    {
 +        cfs.getTracker().replaceFlushed(memtable, sstable);
 +        if (sstable != null)
 +            CompactionManager.instance.submitBackground(cfs);
 +    }
 +
 +    public int getUnleveledSSTables()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired 
instanceof LeveledCompactionStrategy)
 +        {
 +            int count = 0;
 +            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
 +            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
 +            return count;
 +        }
 +        return 0;
 +    }
 +
 +    public synchronized int[] getSSTableCountPerLevel()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired 
instanceof LeveledCompactionStrategy)
 +        {
 +            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
 +            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) 
repaired).getAllLevelSize();
 +            res = sumArrays(res, repairedCountPerLevel);
 +            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) 
unrepaired).getAllLevelSize();
 +            res = sumArrays(res, unrepairedCountPerLevel);
 +            return res;
 +        }
 +        return null;
 +    }
 +
 +    private static int[] sumArrays(int[] a, int[] b)
 +    {
 +        int[] res = new int[Math.max(a.length, b.length)];
 +        for (int i = 0; i < res.length; i++)
 +        {
 +            if (i < a.length && i < b.length)
 +                res[i] = a[i] + b[i];
 +            else if (i < a.length)
 +                res[i] = a[i];
 +            else
 +                res[i] = b[i];
 +        }
 +        return res;
 +    }
 +
 +    public boolean shouldDefragment()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.shouldDefragment();
 +    }
 +
 +
 +    public synchronized void handleNotification(INotification notification, 
Object sender)
 +    {
 +        if (notification instanceof SSTableAddedNotification)
 +        {
 +            SSTableAddedNotification flushedNotification = 
(SSTableAddedNotification) notification;
 +            if (flushedNotification.added.isRepaired())
 +                repaired.addSSTable(flushedNotification.added);
 +            else
 +                unrepaired.addSSTable(flushedNotification.added);
 +        }
 +        else if (notification instanceof SSTableListChangedNotification)
 +        {
 +            SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
 +            Set<SSTableReader> repairedRemoved = new HashSet<>();
 +            Set<SSTableReader> repairedAdded = new HashSet<>();
 +            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 +            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +
 +            for (SSTableReader sstable : listChangedNotification.removed)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedRemoved.add(sstable);
 +                else
 +                    unrepairedRemoved.add(sstable);
 +            }
 +            for (SSTableReader sstable : listChangedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedAdded.add(sstable);
 +                else
 +                    unrepairedAdded.add(sstable);
 +            }
 +            if (!repairedRemoved.isEmpty())
 +            {
 +                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : repairedAdded)
 +                    repaired.addSSTable(sstable);
 +            }
 +
 +            if (!unrepairedRemoved.isEmpty())
 +            {
 +                unrepaired.replaceSSTables(unrepairedRemoved, 
unrepairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : unrepairedAdded)
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableRepairStatusChanged)
 +        {
 +            for (SSTableReader sstable : ((SSTableRepairStatusChanged) 
notification).sstable)
 +            {
 +                if (sstable.isRepaired())
 +                {
 +                    unrepaired.removeSSTable(sstable);
 +                    repaired.addSSTable(sstable);
 +                }
 +                else
 +                {
 +                    repaired.removeSSTable(sstable);
 +                    unrepaired.addSSTable(sstable);
 +                }
 +            }
 +        }
 +        else if (notification instanceof SSTableDeletingNotification)
 +        {
 +            SSTableReader sstable = 
((SSTableDeletingNotification)notification).deleting;
 +            if (sstable.isRepaired())
 +                repaired.removeSSTable(sstable);
 +            else
 +                unrepaired.removeSSTable(sstable);
 +        }
 +    }
 +
++    public void enable()
++    {
++        if (repaired != null)
++            repaired.enable();
++        if (unrepaired != null)
++            unrepaired.enable();
++        // enable this last to make sure the strategies are ready to get 
calls.
++        enabled = true;
++    }
++
++    public void disable()
++    {
++        // disable this first avoid asking disabled strategies for compaction 
tasks
++        enabled = false;
++        if (repaired != null)
++            repaired.disable();
++        if (unrepaired != null)
++            unrepaired.disable();
++    }
++
 +    /**
 +     * Create ISSTableScanner from the given sstables
 +     *
 +     * Delegates the call to the compaction strategies to allow LCS to create 
a scanner
 +     * @param sstables
 +     * @param range
 +     * @return
 +     */
 +    @SuppressWarnings("resource")
 +    public synchronized AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables, Range<Token> range)
 +    {
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.isRepaired())
 +                repairedSSTables.add(sstable);
 +            else
 +                unrepairedSSTables.add(sstable);
 +        }
 +
 +
 +        AbstractCompactionStrategy.ScannerList repairedScanners = 
repaired.getScanners(repairedSSTables, range);
 +        AbstractCompactionStrategy.ScannerList unrepairedScanners = 
unrepaired.getScanners(unrepairedSSTables, range);
 +
 +        List<ISSTableScanner> scanners = new 
ArrayList<>(repairedScanners.scanners.size() + 
unrepairedScanners.scanners.size());
 +        scanners.addAll(repairedScanners.scanners);
 +        scanners.addAll(unrepairedScanners.scanners);
 +        return new AbstractCompactionStrategy.ScannerList(scanners);
 +    }
 +
 +    public synchronized AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables)
 +    {
 +        return getScanners(sstables, null);
 +    }
 +
 +    public Collection<Collection<SSTableReader>> 
groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
 +    {
 +        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
 +    }
 +
 +    public long getMaxSSTableBytes()
 +    {
 +        return unrepaired.getMaxSSTableBytes();
 +    }
 +
 +    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, 
int gcBefore, long maxSSTableBytes)
 +    {
 +        return 
getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn,
 gcBefore, maxSSTableBytes);
 +    }
 +
 +    public Collection<AbstractCompactionTask> getMaximalTasks(final int 
gcBefore, final boolean splitOutput)
 +    {
 +        // runWithCompactionsDisabled cancels active compactions and disables 
them, then we are able
 +        // to make the repaired/unrepaired strategies mark their own sstables 
as compacting. Once the
 +        // sstables are marked the compactions are re-enabled
 +        return cfs.runWithCompactionsDisabled(new 
Callable<Collection<AbstractCompactionTask>>()
 +        {
 +            @Override
 +            public Collection<AbstractCompactionTask> call() throws Exception
 +            {
 +                synchronized (CompactionStrategyManager.this)
 +                {
 +                    Collection<AbstractCompactionTask> repairedTasks = 
repaired.getMaximalTask(gcBefore, splitOutput);
 +                    Collection<AbstractCompactionTask> unrepairedTasks = 
unrepaired.getMaximalTask(gcBefore, splitOutput);
 +
 +                    if (repairedTasks == null && unrepairedTasks == null)
 +                        return null;
 +
 +                    if (repairedTasks == null)
 +                        return unrepairedTasks;
 +                    if (unrepairedTasks == null)
 +                        return repairedTasks;
 +
 +                    List<AbstractCompactionTask> tasks = new ArrayList<>();
 +                    tasks.addAll(repairedTasks);
 +                    tasks.addAll(unrepairedTasks);
 +                    return tasks;
 +                }
 +            }
 +        }, false);
 +    }
 +
 +    public AbstractCompactionTask 
getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
 +    {
 +        return 
getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables,
 gcBefore);
 +    }
 +
 +    public int getEstimatedRemainingTasks()
 +    {
 +        int tasks = 0;
 +        tasks += repaired.getEstimatedRemainingTasks();
 +        tasks += unrepaired.getEstimatedRemainingTasks();
 +
 +        return tasks;
 +    }
 +
 +    public boolean shouldBeEnabled()
 +    {
-         String optionValue = 
cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
++        String optionValue = options.get(COMPACTION_ENABLED);
 +        return optionValue == null || Boolean.parseBoolean(optionValue);
 +    }
 +
 +    public String getName()
 +    {
 +        return unrepaired.getName();
 +    }
 +
 +    public List<AbstractCompactionStrategy> getStrategies()
 +    {
 +        return Arrays.asList(repaired, unrepaired);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 30d38a1,0d06f67..f5cb2a3
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -87,13 -79,13 +84,13 @@@ public class DateTieredCompactionStrate
       */
      private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
      {
-         if (!isEnabled() || 
Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
 -        if (cfs.getSSTables().isEmpty())
++        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
              return Collections.emptyList();
  
 -        Set<SSTableReader> uncompacting = Sets.intersection(sstables, 
cfs.getUncompactingSSTables());
 +        Set<SSTableReader> uncompacting = 
ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
  
          // Find fully expired SSTables. Those will be included no matter what.
 -        Set<SSTableReader> expired = 
CompactionController.getFullyExpiredSSTables(cfs, uncompacting, 
cfs.getOverlappingSSTables(uncompacting), gcBefore);
 +        Set<SSTableReader> expired = 
CompactionController.getFullyExpiredSSTables(cfs, uncompacting, 
cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
          Set<SSTableReader> candidates = 
Sets.newHashSet(filterSuspectSSTables(uncompacting));
  
          List<SSTableReader> compactionCandidates = new 
ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), 
gcBefore));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------

Reply via email to