Repository: cassandra
Updated Branches:
  refs/heads/trunk f8b3a1588 -> ed0a07c38


Improve CompactionManager concurrency

Patch by marcuse; reviewed by yukim for CASSANDRA-10099


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ed0a07c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ed0a07c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ed0a07c3

Branch: refs/heads/trunk
Commit: ed0a07c386658395803886ac5f1cf243cd413cbe
Parents: f8b3a15
Author: Marcus Eriksson <marc...@apache.org>
Authored: Mon Feb 8 15:56:12 2016 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Mar 15 15:30:38 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../compaction/AbstractCompactionStrategy.java  |   6 +
 .../compaction/CompactionStrategyManager.java   | 552 +++++++++++++------
 .../DateTieredCompactionStrategy.java           |   4 +-
 .../compaction/LeveledCompactionStrategy.java   |   2 +-
 .../SizeTieredCompactionStrategy.java           |   4 +-
 .../SSTableRepairStatusChanged.java             |   4 +-
 .../cassandra/tools/StandaloneScrubber.java     |   2 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |   2 +-
 9 files changed, 396 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f1c4a3..4fc03b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Improve concurrency in CompactionManager (CASSANDRA-10099)
  * (cqlsh) interpret CQL type for formatting blobs (CASSANDRA-11274)
  * Refuse to start and print txn log information in case of disk
    corruption (CASSANDRA-10112)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index cab56bb..b6d623b 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -315,6 +315,12 @@ public abstract class AbstractCompactionStrategy
 
     public abstract void addSSTable(SSTableReader added);
 
+    public synchronized void addSSTables(Iterable<SSTableReader> added)
+    {
+        for (SSTableReader sstable : added)
+            addSSTable(sstable);
+    }
+
     public abstract void removeSSTable(SSTableReader sstable);
 
     public static class ScannerList implements AutoCloseable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index a9d42eb..1d387dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
@@ -60,11 +62,15 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CompactionStrategyManager.class);
     private final ColumnFamilyStore cfs;
-    private volatile List<AbstractCompactionStrategy> repaired = new 
ArrayList<>();
-    private volatile List<AbstractCompactionStrategy> unrepaired = new 
ArrayList<>();
+    private final List<AbstractCompactionStrategy> repaired = new 
ArrayList<>();
+    private final List<AbstractCompactionStrategy> unrepaired = new 
ArrayList<>();
     private volatile boolean enabled = true;
-    public boolean isActive = true;
+    public volatile boolean isActive = true;
     private volatile CompactionParams params;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = 
lock.writeLock();
+
     /*
         We keep a copy of the schema compaction parameters here to be able to 
decide if we
         should update the compaction strategy in 
maybeReloadCompactionStrategy() due to an ALTER.
@@ -72,7 +78,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         If a user changes the local compaction strategy and then later ALTERs 
a compaction parameter,
         we will use the new compaction parameters.
      */
-    private CompactionParams schemaCompactionParams;
+    private volatile CompactionParams schemaCompactionParams;
     private Directories.DataDirectory[] locations;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
@@ -92,22 +98,29 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      * Returns a task for the compaction strategy that needs it the most (most 
estimated remaining tasks)
      *
      */
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int 
gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         if (!isEnabled())
             return null;
 
         maybeReload(cfs.metadata);
-
-        List<AbstractCompactionStrategy> strategies = new 
ArrayList<>(repaired.size() + unrepaired.size());
-        strategies.addAll(repaired);
-        strategies.addAll(unrepaired);
-        Collections.sort(strategies, (o1, o2) -> 
Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
-        for (AbstractCompactionStrategy strategy : strategies)
+        List<AbstractCompactionStrategy> strategies = new ArrayList<>();
+        readLock.lock();
+        try
+        {
+            strategies.addAll(repaired);
+            strategies.addAll(unrepaired);
+            Collections.sort(strategies, (o1, o2) -> 
Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
+            for (AbstractCompactionStrategy strategy : strategies)
+            {
+                AbstractCompactionTask task = 
strategy.getNextBackgroundTask(gcBefore);
+                if (task != null)
+                    return task;
+            }
+        }
+        finally
         {
-            AbstractCompactionTask task = 
strategy.getNextBackgroundTask(gcBefore);
-            if (task != null)
-                return task;
+            readLock.unlock();
         }
         return null;
     }
@@ -117,7 +130,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         return enabled && isActive;
     }
 
-    public synchronized void resume()
+    public void resume()
     {
         isActive = true;
     }
@@ -127,21 +140,28 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      *
      * Separate call from enable/disable to not have to save the enabled-state 
externally
       */
-    public synchronized void pause()
+    public void pause()
     {
         isActive = false;
     }
 
-
     private void startup()
     {
-        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+        writeLock.lock();
+        try
         {
-            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
-                getCompactionStrategyFor(sstable).addSSTable(sstable);
+            for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+            {
+                if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+                    getCompactionStrategyFor(sstable).addSSTable(sstable);
+            }
+            repaired.forEach(AbstractCompactionStrategy::startup);
+            unrepaired.forEach(AbstractCompactionStrategy::startup);
+        }
+        finally
+        {
+            writeLock.unlock();
         }
-        repaired.forEach(AbstractCompactionStrategy::startup);
-        unrepaired.forEach(AbstractCompactionStrategy::startup);
     }
 
     /**
@@ -154,10 +174,18 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader 
sstable)
     {
         int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-        if (sstable.isRepaired())
-            return repaired.get(index);
-        else
-            return unrepaired.get(index);
+        readLock.lock();
+        try
+        {
+            if (sstable.isRepaired())
+                return repaired.get(index);
+            else
+                return unrepaired.get(index);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     /**
@@ -200,18 +228,35 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
     public void shutdown()
     {
-        isActive = false;
-        repaired.forEach(AbstractCompactionStrategy::shutdown);
-        unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        writeLock.lock();
+        try
+        {
+            isActive = false;
+            repaired.forEach(AbstractCompactionStrategy::shutdown);
+            unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
-    public synchronized void maybeReload(CFMetaData metadata)
+    public void maybeReload(CFMetaData metadata)
     {
         // compare the old schema configuration to the new one, ignore any 
locally set changes.
         if (metadata.params.compaction.equals(schemaCompactionParams) &&
             Arrays.equals(locations, 
cfs.getDirectories().getWriteableLocations())) // any drives broken?
             return;
-        reload(metadata);
+
+        writeLock.lock();
+        try
+        {
+            reload(metadata);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     /**
@@ -220,7 +265,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      * Called after changing configuration and at startup.
      * @param metadata
      */
-    public synchronized void reload(CFMetaData metadata)
+    private void reload(CFMetaData metadata)
     {
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
         if (!metadata.params.compaction.equals(schemaCompactionParams))
@@ -247,34 +292,50 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
     public int getUnleveledSSTables()
     {
-        if (repaired.get(0) instanceof LeveledCompactionStrategy && 
unrepaired.get(0) instanceof LeveledCompactionStrategy)
+        readLock.lock();
+        try
         {
-            int count = 0;
-            for (AbstractCompactionStrategy strategy : repaired)
-                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
-            for (AbstractCompactionStrategy strategy : unrepaired)
-                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
-            return count;
+            if (repaired.get(0) instanceof LeveledCompactionStrategy && 
unrepaired.get(0) instanceof LeveledCompactionStrategy)
+            {
+                int count = 0;
+                for (AbstractCompactionStrategy strategy : repaired)
+                    count += ((LeveledCompactionStrategy) 
strategy).getLevelSize(0);
+                for (AbstractCompactionStrategy strategy : unrepaired)
+                    count += ((LeveledCompactionStrategy) 
strategy).getLevelSize(0);
+                return count;
+            }
+        }
+        finally
+        {
+            readLock.unlock();
         }
         return 0;
     }
 
-    public synchronized int[] getSSTableCountPerLevel()
+    public int[] getSSTableCountPerLevel()
     {
-        if (repaired.get(0) instanceof LeveledCompactionStrategy && 
unrepaired.get(0) instanceof LeveledCompactionStrategy)
+        readLock.lock();
+        try
         {
-            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
-            for (AbstractCompactionStrategy strategy : repaired)
+            if (repaired.get(0) instanceof LeveledCompactionStrategy && 
unrepaired.get(0) instanceof LeveledCompactionStrategy)
             {
-                int[] repairedCountPerLevel = ((LeveledCompactionStrategy) 
strategy).getAllLevelSize();
-                res = sumArrays(res, repairedCountPerLevel);
-            }
-            for (AbstractCompactionStrategy strategy : unrepaired)
-            {
-                int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) 
strategy).getAllLevelSize();
-                res = sumArrays(res, unrepairedCountPerLevel);
+                int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+                for (AbstractCompactionStrategy strategy : repaired)
+                {
+                    int[] repairedCountPerLevel = ((LeveledCompactionStrategy) 
strategy).getAllLevelSize();
+                    res = sumArrays(res, repairedCountPerLevel);
+                }
+                for (AbstractCompactionStrategy strategy : unrepaired)
+                {
+                    int[] unrepairedCountPerLevel = 
((LeveledCompactionStrategy) strategy).getAllLevelSize();
+                    res = sumArrays(res, unrepairedCountPerLevel);
+                }
+                return res;
             }
-            return res;
+        }
+        finally
+        {
+            readLock.unlock();
         }
         return null;
     }
@@ -296,85 +357,112 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
     public boolean shouldDefragment()
     {
-        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
-        return repaired.get(0).shouldDefragment();
+        readLock.lock();
+        try
+        {
+            assert 
repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+            return repaired.get(0).shouldDefragment();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public Directories getDirectories()
     {
-        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
-        return repaired.get(0).getDirectories();
+        readLock.lock();
+        try
+        {
+            assert 
repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+            return repaired.get(0).getDirectories();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
-    public synchronized void handleNotification(INotification notification, 
Object sender)
+    private void handleFlushNotification(Iterable<SSTableReader> added)
     {
-        maybeReload(cfs.metadata);
-        if (notification instanceof SSTableAddedNotification)
+        readLock.lock();
+        try
         {
-            SSTableAddedNotification flushedNotification = 
(SSTableAddedNotification) notification;
-            for (SSTableReader sstable : flushedNotification.added)
+            for (SSTableReader sstable : added)
                 getCompactionStrategyFor(sstable).addSSTable(sstable);
         }
-        else if (notification instanceof SSTableListChangedNotification)
+        finally
         {
-            // a bit of gymnastics to be able to replace sstables in 
compaction strategies
-            // we use this to know that a compaction finished and where to 
start the next compaction in LCS
-            SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
+            readLock.unlock();
+        }
+    }
 
-            Directories.DataDirectory [] locations = 
cfs.getDirectories().getWriteableLocations();
-            int locationSize = cfs.getPartitioner().splitter().isPresent() ? 
locations.length : 1;
+    private void handleListChangedNotification(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed)
+    {
+        // a bit of gymnastics to be able to replace sstables in compaction 
strategies
+        // we use this to know that a compaction finished and where to start 
the next compaction in LCS
+        Directories.DataDirectory [] locations = 
cfs.getDirectories().getWriteableLocations();
+        int locationSize = cfs.getPartitioner().splitter().isPresent() ? 
locations.length : 1;
 
-            List<Set<SSTableReader>> repairedRemoved = new 
ArrayList<>(locationSize);
-            List<Set<SSTableReader>> repairedAdded = new 
ArrayList<>(locationSize);
-            List<Set<SSTableReader>> unrepairedRemoved = new 
ArrayList<>(locationSize);
-            List<Set<SSTableReader>> unrepairedAdded = new 
ArrayList<>(locationSize);
+        List<Set<SSTableReader>> repairedRemoved = new 
ArrayList<>(locationSize);
+        List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+        List<Set<SSTableReader>> unrepairedRemoved = new 
ArrayList<>(locationSize);
+        List<Set<SSTableReader>> unrepairedAdded = new 
ArrayList<>(locationSize);
 
-            for (int i = 0; i < locationSize; i++)
-            {
-                repairedRemoved.add(new HashSet<>());
-                repairedAdded.add(new HashSet<>());
-                unrepairedRemoved.add(new HashSet<>());
-                unrepairedAdded.add(new HashSet<>());
-            }
-
-            for (SSTableReader sstable : listChangedNotification.removed)
-            {
-                int i = getCompactionStrategyIndex(cfs, getDirectories(), 
sstable);
-                if (sstable.isRepaired())
-                    repairedRemoved.get(i).add(sstable);
-                else
-                    unrepairedRemoved.get(i).add(sstable);
-            }
-            for (SSTableReader sstable : listChangedNotification.added)
-            {
-                int i = getCompactionStrategyIndex(cfs, getDirectories(), 
sstable);
-                if (sstable.isRepaired())
-                    repairedAdded.get(i).add(sstable);
-                else
-                    unrepairedAdded.get(i).add(sstable);
-            }
+        for (int i = 0; i < locationSize; i++)
+        {
+            repairedRemoved.add(new HashSet<>());
+            repairedAdded.add(new HashSet<>());
+            unrepairedRemoved.add(new HashSet<>());
+            unrepairedAdded.add(new HashSet<>());
+        }
 
+        for (SSTableReader sstable : removed)
+        {
+            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            if (sstable.isRepaired())
+                repairedRemoved.get(i).add(sstable);
+            else
+                unrepairedRemoved.get(i).add(sstable);
+        }
+        for (SSTableReader sstable : added)
+        {
+            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            if (sstable.isRepaired())
+                repairedAdded.get(i).add(sstable);
+            else
+                unrepairedAdded.get(i).add(sstable);
+        }
+        // we need write lock here since we might be moving sstables between 
strategies
+        writeLock.lock();
+        try
+        {
             for (int i = 0; i < locationSize; i++)
             {
                 if (!repairedRemoved.get(i).isEmpty())
                     repaired.get(i).replaceSSTables(repairedRemoved.get(i), 
repairedAdded.get(i));
                 else
-                {
-                    for (SSTableReader sstable : repairedAdded.get(i))
-                        repaired.get(i).addSSTable(sstable);
-                }
+                    repaired.get(i).addSSTables(repairedAdded.get(i));
+
                 if (!unrepairedRemoved.get(i).isEmpty())
                     
unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), 
unrepairedAdded.get(i));
                 else
-                {
-                    for (SSTableReader sstable : unrepairedAdded.get(i))
-                        unrepaired.get(i).addSSTable(sstable);
-                }
+                    unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
             }
         }
-        else if (notification instanceof SSTableRepairStatusChanged)
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    private void handleRepairStatusChangedNotification(Iterable<SSTableReader> 
sstables)
+    {
+        // we need a write lock here since we move sstables from one strategy 
instance to another
+        writeLock.lock();
+        try
         {
-            for (SSTableReader sstable : ((SSTableRepairStatusChanged) 
notification).sstable)
+            for (SSTableReader sstable : sstables)
             {
                 int index = getCompactionStrategyIndex(cfs, getDirectories(), 
sstable);
                 if (sstable.isRepaired())
@@ -389,31 +477,81 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                 }
             }
         }
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    private void handleDeletingNotification(SSTableReader deleted)
+    {
+        readLock.lock();
+        try
+        {
+            getCompactionStrategyFor(deleted).removeSSTable(deleted);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+    }
+
+    public void handleNotification(INotification notification, Object sender)
+    {
+        maybeReload(cfs.metadata);
+        if (notification instanceof SSTableAddedNotification)
+        {
+            handleFlushNotification(((SSTableAddedNotification) 
notification).added);
+        }
+        else if (notification instanceof SSTableListChangedNotification)
+        {
+            SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
+            handleListChangedNotification(listChangedNotification.added, 
listChangedNotification.removed);
+        }
+        else if (notification instanceof SSTableRepairStatusChanged)
+        {
+            
handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) 
notification).sstables);
+        }
         else if (notification instanceof SSTableDeletingNotification)
         {
-            SSTableReader sstable = ((SSTableDeletingNotification) 
notification).deleting;
-            getCompactionStrategyFor(sstable).removeSSTable(sstable);
+            handleDeletingNotification(((SSTableDeletingNotification) 
notification).deleting);
         }
     }
 
     public void enable()
     {
-        if (repaired != null)
-            repaired.forEach(AbstractCompactionStrategy::enable);
-        if (unrepaired != null)
-            unrepaired.forEach(AbstractCompactionStrategy::enable);
-        // enable this last to make sure the strategies are ready to get calls.
-        enabled = true;
+        writeLock.lock();
+        try
+        {
+            if (repaired != null)
+                repaired.forEach(AbstractCompactionStrategy::enable);
+            if (unrepaired != null)
+                unrepaired.forEach(AbstractCompactionStrategy::enable);
+            // enable this last to make sure the strategies are ready to get 
calls.
+            enabled = true;
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     public void disable()
     {
-        // disable this first avoid asking disabled strategies for compaction 
tasks
-        enabled = false;
-        if (repaired != null)
-            repaired.forEach(AbstractCompactionStrategy::disable);
-        if (unrepaired != null)
-            unrepaired.forEach(AbstractCompactionStrategy::disable);
+        writeLock.lock();
+        try
+        {
+            // disable this first avoid asking disabled strategies for 
compaction tasks
+            enabled = false;
+            if (repaired != null)
+                repaired.forEach(AbstractCompactionStrategy::disable);
+            if (unrepaired != null)
+                unrepaired.forEach(AbstractCompactionStrategy::disable);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     /**
@@ -425,7 +563,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
      * @return
      */
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> 
ranges)
+    public AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> 
ranges)
     {
         assert repaired.size() == unrepaired.size();
         List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
@@ -447,68 +585,92 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
 
-        for (Range<Token> range : ranges)
+        readLock.lock();
+        try
         {
-            List<ISSTableScanner> repairedScanners = new ArrayList<>();
-            List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
-
-            for (int i = 0; i < repairedSSTables.size(); i++)
-            {
-                if (!repairedSSTables.get(i).isEmpty())
-                    
repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), 
range).scanners);
-            }
-            for (int i = 0; i < unrepairedSSTables.size(); i++)
+            for (Range<Token> range : ranges)
             {
-                if (!unrepairedSSTables.get(i).isEmpty())
-                    
scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), 
range).scanners);
-            }
-            for (ISSTableScanner scanner : Iterables.concat(repairedScanners, 
unrepairedScanners))
-            {
-                if (!scanners.add(scanner))
-                    scanner.close();
+                List<ISSTableScanner> repairedScanners = new ArrayList<>();
+                List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
+
+                for (int i = 0; i < repairedSSTables.size(); i++)
+                {
+                    if (!repairedSSTables.get(i).isEmpty())
+                        
repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), 
range).scanners);
+                }
+                for (int i = 0; i < unrepairedSSTables.size(); i++)
+                {
+                    if (!unrepairedSSTables.get(i).isEmpty())
+                        
scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), 
range).scanners);
+                }
+                for (ISSTableScanner scanner : 
Iterables.concat(repairedScanners, unrepairedScanners))
+                {
+                    if (!scanners.add(scanner))
+                        scanner.close();
+                }
             }
+            return new AbstractCompactionStrategy.ScannerList(scanners);
+        }
+        finally
+        {
+            readLock.unlock();
         }
-        return new AbstractCompactionStrategy.ScannerList(scanners);
     }
 
-    public synchronized AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables)
+    public AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables)
     {
         return getScanners(sstables, Collections.singleton(null));
     }
 
     public Collection<Collection<SSTableReader>> 
groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
-        Map<Integer, List<SSTableReader>> groups = 
sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> 
getCompactionStrategyIndex(cfs, getDirectories(), s)));
-        Collection<Collection<SSTableReader>> anticompactionGroups = new 
ArrayList<>();
+        readLock.lock();
+        try
+        {
+            Map<Integer, List<SSTableReader>> groups = 
sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> 
getCompactionStrategyIndex(cfs, getDirectories(), s)));
+            Collection<Collection<SSTableReader>> anticompactionGroups = new 
ArrayList<>();
 
-        for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
-            
anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
-        return anticompactionGroups;
+            for (Map.Entry<Integer, List<SSTableReader>> group : 
groups.entrySet())
+                
anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
+            return anticompactionGroups;
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public long getMaxSSTableBytes()
     {
-        return unrepaired.get(0).getMaxSSTableBytes();
+        readLock.lock();
+        try
+        {
+            return unrepaired.get(0).getMaxSSTableBytes();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, 
int gcBefore, long maxSSTableBytes)
     {
         maybeReload(cfs.metadata);
-        validateForCompaction(txn.originals());
+        validateForCompaction(txn.originals(), cfs, getDirectories());
         return 
getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn,
 gcBefore, maxSSTableBytes);
     }
 
-    private void validateForCompaction(Iterable<SSTableReader> input)
+    private static void validateForCompaction(Iterable<SSTableReader> input, 
ColumnFamilyStore cfs, Directories directories)
     {
         SSTableReader firstSSTable = Iterables.getFirst(input, null);
         assert firstSSTable != null;
         boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), 
firstSSTable);
+        int firstIndex = getCompactionStrategyIndex(cfs, directories, 
firstSSTable);
         for (SSTableReader sstable : input)
         {
             if (sstable.isRepaired() != repaired)
                 throw new UnsupportedOperationException("You can't mix 
repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, 
getDirectories(), sstable))
+            if (firstIndex != getCompactionStrategyIndex(cfs, directories, 
sstable))
                 throw new UnsupportedOperationException("You can't mix 
sstables from different directories in a compaction");
         }
     }
@@ -524,9 +686,10 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             @Override
             public Collection<AbstractCompactionTask> call() throws Exception
             {
-                synchronized (CompactionStrategyManager.this)
+                List<AbstractCompactionTask> tasks = new ArrayList<>();
+                readLock.lock();
+                try
                 {
-                    List<AbstractCompactionTask> tasks = new ArrayList<>();
                     for (AbstractCompactionStrategy strategy : repaired)
                     {
                         Collection<AbstractCompactionTask> task = 
strategy.getMaximalTask(gcBefore, splitOutput);
@@ -539,10 +702,14 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                         if (task != null)
                             tasks.addAll(task);
                     }
-                    if (tasks.isEmpty())
-                        return null;
-                    return tasks;
                 }
+                finally
+                {
+                    readLock.unlock();
+                }
+                if (tasks.isEmpty())
+                    return null;
+                return tasks;
             }
         }, false, false);
     }
@@ -550,18 +717,34 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, int gcBefore)
     {
         maybeReload(cfs.metadata);
-        validateForCompaction(sstables);
-        return 
getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables,
 gcBefore);
+        validateForCompaction(sstables, cfs, getDirectories());
+        readLock.lock();
+        try
+        {
+            return 
getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables,
 gcBefore);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public int getEstimatedRemainingTasks()
     {
         int tasks = 0;
-        for (AbstractCompactionStrategy strategy : repaired)
-            tasks += strategy.getEstimatedRemainingTasks();
-        for (AbstractCompactionStrategy strategy : unrepaired)
-            tasks += strategy.getEstimatedRemainingTasks();
+        readLock.lock();
+        try
+        {
 
+            for (AbstractCompactionStrategy strategy : repaired)
+                tasks += strategy.getEstimatedRemainingTasks();
+            for (AbstractCompactionStrategy strategy : unrepaired)
+                tasks += strategy.getEstimatedRemainingTasks();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
         return tasks;
     }
 
@@ -572,23 +755,40 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
     public String getName()
     {
-        return unrepaired.get(0).getName();
+        readLock.lock();
+        try
+        {
+            return unrepaired.get(0).getName();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
-    public List<List<AbstractCompactionStrategy>> getStrategies()
+    @VisibleForTesting
+    List<List<AbstractCompactionStrategy>> getStrategies()
     {
         return Arrays.asList(repaired, unrepaired);
     }
 
-    public synchronized void setNewLocalCompactionStrategy(CompactionParams 
params)
+    public void setNewLocalCompactionStrategy(CompactionParams params)
     {
         logger.info("Switching local compaction strategy from {} to {}}", 
this.params, params);
-        setStrategy(params);
-        if (shouldBeEnabled())
-            enable();
-        else
-            disable();
-        startup();
+        writeLock.lock();
+        try
+        {
+            setStrategy(params);
+            if (shouldBeEnabled())
+                enable();
+            else
+                disable();
+            startup();
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     private void setStrategy(CompactionParams params)
@@ -633,13 +833,21 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                                                        Collection<Index> 
indexes,
                                                        LifecycleTransaction 
txn)
     {
-        if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+        readLock.lock();
+        try
         {
-            return unrepaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, txn);
+            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+            {
+                return unrepaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, txn);
+            }
+            else
+            {
+                return repaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, txn);
+            }
         }
-        else
+        finally
         {
-            return repaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, txn);
+            readLock.unlock();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 248473c..9a17e06 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -63,7 +63,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
 
     @Override
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int 
gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         while (true)
         {
@@ -83,7 +83,7 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
      * @param gcBefore
      * @return
      */
-    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final 
int gcBefore)
     {
         if (sstables.isEmpty())
             return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 953971a..84ddbbc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -90,7 +90,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
      * (by explicit user request) even when compaction is disabled.
      */
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int 
gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         while (true)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index e36adf2..28bdf5c 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -75,7 +75,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         this.sizeTieredOptions = new 
SizeTieredCompactionStrategyOptions(options);
     }
 
-    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final 
int gcBefore)
     {
         // make local copies so they can't be changed out from under us 
mid-method
         int minThreshold = cfs.getMinimumCompactionThreshold();
@@ -175,7 +175,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
     }
 
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int 
gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         while (true)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java 
b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
index d1398bc..8c48fa8 100644
--- 
a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
+++ 
b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
@@ -24,10 +24,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 public class SSTableRepairStatusChanged implements INotification
 {
-    public final Collection<SSTableReader> sstable;
+    public final Collection<SSTableReader> sstables;
 
     public SSTableRepairStatusChanged(Collection<SSTableReader> 
repairStatusChanged)
     {
-        this.sstable = repairStatusChanged;
+        this.sstables = repairStatusChanged;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java 
b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 4249430..42772ef 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -161,7 +161,7 @@ public class StandaloneScrubber
     private static void checkManifest(CompactionStrategyManager 
strategyManager, ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
     {
         int maxSizeInMB = 
(int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 
1024L));
-        if (strategyManager.getStrategies().size() == 2 && 
strategyManager.getStrategies().get(0) instanceof LeveledCompactionStrategy)
+        if 
(strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class))
         {
             System.out.println("Checking leveled manifest");
             Predicate<SSTableReader> repairedPredicate = new 
Predicate<SSTableReader>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index b3dc3d9..0294115 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -349,7 +349,7 @@ public class TrackerTest
         Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) 
listener.received.get(0)).added);
         listener.received.clear();
         tracker.notifySSTableRepairedStatusChanged(singleton(r1));
-        Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) 
listener.received.get(0)).sstable);
+        Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) 
listener.received.get(0)).sstables);
         listener.received.clear();
         Memtable memtable = MockSchema.memtable(cfs);
         tracker.notifyRenewed(memtable);

Reply via email to