Fix consistency of incrementally repaired data

patch by Blake Eggleston, reviewed by Marcus Eriksson for CASSANDRA-9143


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98d74ed9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98d74ed9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98d74ed9

Branch: refs/heads/trunk
Commit: 98d74ed998706e9e047dc0f7886a1e9b18df3ce9
Parents: 1757e13
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Wed Aug 31 15:48:43 2016 -0700
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Feb 6 19:11:18 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   7 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  10 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  19 +-
 .../compaction/AbstractCompactionStrategy.java  |  10 +-
 .../db/compaction/CompactionManager.java        |  96 +-
 .../compaction/CompactionStrategyManager.java   | 258 +++++-
 .../cassandra/db/compaction/CompactionTask.java |  17 +
 .../DateTieredCompactionStrategy.java           |   6 +
 .../compaction/LeveledCompactionStrategy.java   |   6 +
 .../db/compaction/LeveledManifest.java          |  11 +
 .../db/compaction/PendingRepairManager.java     | 432 +++++++++
 .../cassandra/db/compaction/Scrubber.java       |   8 +-
 .../SizeTieredCompactionStrategy.java           |   7 +
 .../TimeWindowCompactionStrategy.java           |   6 +
 .../cassandra/db/compaction/Upgrader.java       |   7 +-
 .../cassandra/db/compaction/Verifier.java       |   2 +-
 .../writers/CompactionAwareWriter.java          |   3 +
 .../writers/DefaultCompactionWriter.java        |   1 +
 .../writers/MajorLeveledCompactionWriter.java   |   2 +-
 .../writers/MaxSSTableSizeWriter.java           |   1 +
 .../SplittingSizeTieredCompactionWriter.java    |   1 +
 .../org/apache/cassandra/dht/RangeStreamer.java |   2 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   3 +-
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  15 +-
 .../io/sstable/SimpleSSTableMultiWriter.java    |   4 +-
 .../sstable/format/RangeAwareSSTableWriter.java |   9 +-
 .../io/sstable/format/SSTableReader.java        |  11 +
 .../io/sstable/format/SSTableWriter.java        |  17 +-
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  14 +-
 .../io/sstable/format/big/BigTableWriter.java   |  11 +-
 .../sstable/metadata/IMetadataSerializer.java   |   5 +-
 .../io/sstable/metadata/MetadataCollector.java  |   9 +-
 .../io/sstable/metadata/MetadataSerializer.java |   9 +-
 .../io/sstable/metadata/StatsMetadata.java      |  67 +-
 .../net/IncomingStreamingConnection.java        |   2 +-
 .../cassandra/repair/AnticompactionTask.java    | 174 ----
 .../apache/cassandra/repair/LocalSyncTask.java  |   7 +-
 .../org/apache/cassandra/repair/RepairJob.java  |  30 +-
 .../repair/RepairMessageVerbHandler.java        |  61 +-
 .../apache/cassandra/repair/RepairRunnable.java | 306 ++++---
 .../apache/cassandra/repair/RepairSession.java  |   5 +-
 .../cassandra/repair/StreamingRepairTask.java   |   6 +-
 .../repair/SystemDistributedKeyspace.java       |   3 +-
 .../org/apache/cassandra/repair/Validator.java  |  11 +-
 .../repair/consistent/ConsistentSession.java    | 325 +++++++
 .../repair/consistent/CoordinatorSession.java   | 312 +++++++
 .../repair/consistent/CoordinatorSessions.java  |  95 ++
 .../repair/consistent/LocalSession.java         | 129 +++
 .../repair/consistent/LocalSessionInfo.java     |  67 ++
 .../repair/consistent/LocalSessions.java        | 703 +++++++++++++++
 .../consistent/PendingAntiCompaction.java       | 195 ++++
 .../repair/messages/AnticompactionRequest.java  | 107 ---
 .../cassandra/repair/messages/FailSession.java  |  71 ++
 .../repair/messages/FinalizeCommit.java         |  78 ++
 .../repair/messages/FinalizePromise.java        |  95 ++
 .../repair/messages/FinalizePropose.java        |  78 ++
 .../messages/PrepareConsistentRequest.java      | 124 +++
 .../messages/PrepareConsistentResponse.java     |  94 ++
 .../repair/messages/RepairMessage.java          |  14 +-
 .../cassandra/repair/messages/RepairOption.java |   5 +
 .../repair/messages/StatusRequest.java          |  77 ++
 .../repair/messages/StatusResponse.java         |  90 ++
 .../cassandra/service/ActiveRepairService.java  | 329 ++-----
 .../service/ActiveRepairServiceMBean.java       |  30 +
 .../cassandra/service/CassandraDaemon.java      |   1 +
 .../cassandra/streaming/ConnectionHandler.java  |   3 +-
 .../cassandra/streaming/StreamCoordinator.java  |   8 +-
 .../apache/cassandra/streaming/StreamPlan.java  |   8 +-
 .../cassandra/streaming/StreamReader.java       |  11 +-
 .../cassandra/streaming/StreamResultFuture.java |  10 +-
 .../cassandra/streaming/StreamSession.java      |   8 +-
 .../compress/CompressedStreamReader.java        |   4 +-
 .../streaming/messages/StreamInitMessage.java   |  19 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   9 +
 .../org/apache/cassandra/tools/NodeTool.java    |   1 +
 .../cassandra/tools/SSTableMetadataViewer.java  |   1 +
 .../tools/SSTableRepairedAtSetter.java          |   5 +-
 .../cassandra/tools/nodetool/RepairAdmin.java   | 147 +++
 .../db/RepairedDataTombstonesTest.java          |   2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../compaction/AbstractPendingRepairTest.java   | 139 +++
 .../db/compaction/AntiCompactionTest.java       |  63 +-
 ...tionManagerGetSSTablesForValidationTest.java | 177 ++++
 ...pactionStrategyManagerPendingRepairTest.java | 291 ++++++
 .../LeveledCompactionStrategyTest.java          |   4 +-
 .../db/compaction/PendingRepairManagerTest.java | 213 +++++
 .../db/lifecycle/LogTransactionTest.java        |   2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   1 +
 .../cassandra/dht/StreamStateStoreTest.java     |   4 +-
 .../io/sstable/BigTableWriterTest.java          |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |   2 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   5 +-
 .../io/sstable/SSTableWriterTestBase.java       |   2 +-
 .../format/SSTableFlushObserverTest.java        |   2 +-
 .../metadata/MetadataSerializerTest.java        |  19 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |   8 +-
 .../cassandra/repair/RepairSessionTest.java     |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   2 +-
 .../AbstractConsistentSessionTest.java          |  91 ++
 .../consistent/CoordinatorSessionTest.java      | 498 +++++++++++
 .../consistent/CoordinatorSessionsTest.java     | 208 +++++
 .../repair/consistent/LocalSessionAccessor.java |  62 ++
 .../repair/consistent/LocalSessionTest.java     | 885 +++++++++++++++++++
 .../consistent/PendingAntiCompactionTest.java   | 327 +++++++
 .../RepairMessageSerializationsTest.java        |   7 -
 .../messages/RepairMessageSerializerTest.java   | 115 +++
 .../repair/messages/RepairOptionTest.java       |  16 +-
 .../org/apache/cassandra/schema/MockSchema.java |   2 +-
 .../service/ActiveRepairServiceTest.java        | 112 +--
 .../streaming/StreamTransferTaskTest.java       |   6 +-
 114 files changed, 7235 insertions(+), 990 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b339e6e..71513b0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fix consistency of incrementally repaired data (CASSANDRA-9143)
  * Increase commitlog version (CASSANDRA-13161)
  * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
  * Refactor ColumnCondition (CASSANDRA-12981)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1c4bfdc..ee1fd6d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -34,7 +34,12 @@ Upgrading
       add new nodes to a datacenter before they can set set ALTER or CREATE
       keyspace replication policies using that datacenter. Existing keyspaces
       will continue to operate, but CREATE and ALTER will validate that all
-      datacenters specified exist in the cluster. 
+      datacenters specified exist in the cluster.
+    - Cassandra 4.0 fixes a problem with incremental repair which caused 
repaired
+      data to be inconsistent between nodes. The fix changes the behavior of 
both
+      full and incremental repairs. For full repairs, data is no longer marked
+      repaired. For incremental repairs, anticompaction is run at the beginning
+      of the repair, instead of at the end.
 
 3.10
 ====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 039dc33..dceb41d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -520,15 +520,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return directories;
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, 
SerializationHeader header, LifecycleTransaction txn)
     {
         MetadataCollector collector = new 
MetadataCollector(metadata().comparator).sstableLevel(sstableLevel);
-        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
collector, header, txn);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, collector, header, txn);
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector metadataCollector, 
SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector 
metadataCollector, SerializationHeader header, LifecycleTransaction txn)
     {
-        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, metadataCollector, header, indexManager.listIndexes(), txn);
+        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, pendingRepair, metadataCollector, header, 
indexManager.listIndexes(), txn);
     }
 
     public boolean supportsEarlyOpen()
@@ -1888,7 +1888,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
-    public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws 
IOException
+    public Refs<SSTableReader> getSnapshotSSTableReaders(String tag) throws 
IOException
     {
         Map<Integer, SSTableReader> active = new HashMap<>();
         for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 01e741d..c0a1701 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -508,6 +508,7 @@ public class Memtable implements Comparable<Memtable>
             return cfs.createSSTableMultiWriter(descriptor,
                                                 toFlush.size(),
                                                 
ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                
ActiveRepairService.NO_PENDING_REPAIR,
                                                 sstableMetadataCollector,
                                                 new SerializationHeader(true, 
cfs.metadata(), columns, stats), txn);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index e826dd8..058b378 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -101,6 +101,7 @@ public final class SystemKeyspace
     public static final String VIEWS_BUILDS_IN_PROGRESS = 
"views_builds_in_progress";
     public static final String BUILT_VIEWS = "built_views";
     public static final String PREPARED_STATEMENTS = "prepared_statements";
+    public static final String REPAIRS = "repairs";
 
     public static final TableMetadata Batches =
         parse(BATCHES,
@@ -291,6 +292,21 @@ public final class SystemKeyspace
               + "PRIMARY KEY ((prepared_id)))")
               .build();
 
+    private static final TableMetadata Repairs =
+        parse(REPAIRS,
+              "repairs",
+              "CREATE TABLE %s ("
+              + "parent_id timeuuid, "
+              + "started_at timestamp, "
+              + "last_update timestamp, "
+              + "repaired_at timestamp, "
+              + "state int, "
+              + "coordinator inet, "
+              + "participants set<inet>, "
+              + "ranges set<blob>, "
+              + "cfids set<uuid>, "
+              + "PRIMARY KEY (parent_id))").build();
+
     private static TableMetadata.Builder parse(String table, String 
description, String cql)
     {
         return CreateTableStatement.parse(format(cql, table), 
SchemaConstants.SYSTEM_KEYSPACE_NAME)
@@ -322,7 +338,8 @@ public final class SystemKeyspace
                          TransferredRanges,
                          ViewsBuildsInProgress,
                          BuiltViews,
-                         PreparedStatements);
+                         PreparedStatements,
+                         Repairs);
     }
 
     private static Functions functions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index fccad19..74c6419 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
@@ -331,6 +332,12 @@ public abstract class AbstractCompactionStrategy
 
     public abstract void removeSSTable(SSTableReader sstable);
 
+    /**
+     * Returns the sstables managed by this strategy instance
+     */
+    @VisibleForTesting
+    protected abstract Set<SSTableReader> getSSTables();
+
     public static class ScannerList implements AutoCloseable
     {
         public final List<ISSTableScanner> scanners;
@@ -579,12 +586,13 @@ public abstract class AbstractCompactionStrategy
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
                                                        long keyCount,
                                                        long repairedAt,
+                                                       UUID pendingRepair,
                                                        MetadataCollector meta,
                                                        SerializationHeader 
header,
                                                        Collection<Index> 
indexes,
                                                        LifecycleTransaction 
txn)
     {
-        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, cfs.metadata, meta, header, indexes, txn);
+        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, cfs.metadata, meta, header, indexes, txn);
     }
 
     public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 0feb236..5a1313b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -63,6 +62,7 @@ import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
@@ -70,6 +70,7 @@ import 
org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 import static java.util.Collections.singleton;
@@ -565,10 +566,11 @@ public class CompactionManager implements 
CompactionManagerMBean
     }
 
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore 
cfs,
-                                          final Collection<Range<Token>> 
ranges,
-                                          final Refs<SSTableReader> sstables,
-                                          final long repairedAt,
-                                          final UUID parentRepairSession)
+                                                    final 
Collection<Range<Token>> ranges,
+                                                    final Refs<SSTableReader> 
sstables,
+                                                    final long repairedAt,
+                                                    final UUID pendingRepair,
+                                                    final UUID 
parentRepairSession)
     {
         Runnable runnable = new WrappedRunnable()
         {
@@ -588,7 +590,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                     sstables.release(compactedSSTables);
                     modifier = cfs.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
                 }
-                performAnticompaction(cfs, ranges, sstables, modifier, 
repairedAt, parentRepairSession);
+                performAnticompaction(cfs, ranges, sstables, modifier, 
repairedAt, pendingRepair, parentRepairSession);
             }
         };
 
@@ -606,6 +608,32 @@ public class CompactionManager implements 
CompactionManagerMBean
     }
 
     /**
+     * Splits the given token ranges of the given sstables into a pending 
repair silo
+     */
+    public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore 
cfs, Collection<Range<Token>> ranges, Refs<SSTableReader> sstables, 
LifecycleTransaction txn, UUID sessionId)
+    {
+        Runnable runnable = new WrappedRunnable()
+        {
+            protected void runMayThrow() throws Exception
+            {
+                performAnticompaction(cfs, ranges, sstables, txn, 
ActiveRepairService.UNREPAIRED_SSTABLE, sessionId, sessionId);
+            }
+        };
+
+        ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, 
null);
+        try
+        {
+            executor.submitIfRunning(task, "pending anticompaction");
+            return task;
+        }
+        finally
+        {
+            if (task.isCancelled())
+                sstables.release();
+        }
+    }
+
+    /**
      * Make sure the {validatedForRepair} are marked for compaction before 
calling this.
      *
      * Caller must reference the validatedForRepair sstables (via 
ParentRepairSession.getActiveRepairedSSTableRefs(..)).
@@ -622,6 +650,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                       Refs<SSTableReader> validatedForRepair,
                                       LifecycleTransaction txn,
                                       long repairedAt,
+                                      UUID pendingRepair,
                                       UUID parentRepairSession) throws 
InterruptedException, IOException
     {
         logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} 
sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), 
validatedForRepair.size(), cfs.getLiveSSTables());
@@ -654,7 +683,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                     if (r.contains(sstableRange))
                     {
                         logger.info("[repair #{}] SSTable {} fully contained 
in range {}, mutating repairedAt instead of anticompacting", 
parentRepairSession, sstable, r);
-                        
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 
repairedAt);
+                        
sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 
repairedAt, pendingRepair);
                         sstable.reloadSSTableMetadata();
                         mutatedRepairStatuses.add(sstable);
                         if (!wasRepairedBefore.get(sstable))
@@ -682,7 +711,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             validatedForRepair.release(Sets.union(nonAnticompacting, 
mutatedRepairStatuses));
             assert txn.originals().equals(sstables);
             if (!sstables.isEmpty())
-                doAntiCompaction(cfs, ranges, txn, repairedAt);
+                doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair);
             txn.finish();
         }
         finally
@@ -1092,10 +1121,10 @@ public class CompactionManager implements 
CompactionManagerMBean
              CompactionController controller = new CompactionController(cfs, 
txn.originals(), getDefaultGcBefore(cfs, nowInSec));
              CompactionIterator ci = new 
CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), 
controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
-            writer.switchWriter(createWriter(cfs, compactionFileLocation, 
expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, 
txn));
+            StatsMetadata metadata = sstable.getSSTableMetadata();
+            writer.switchWriter(createWriter(cfs, compactionFileLocation, 
expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, 
txn));
             long lastBytesScanned = 0;
 
-
             while (ci.hasNext())
             {
                 if (ci.isStopRequested())
@@ -1236,6 +1265,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                              File compactionFileLocation,
                                              long expectedBloomFilterSize,
                                              long repairedAt,
+                                             UUID pendingRepair,
                                              SSTableReader sstable,
                                              LifecycleTransaction txn)
     {
@@ -1245,6 +1275,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                     
cfs.newSSTableDescriptor(compactionFileLocation),
                                     expectedBloomFilterSize,
                                     repairedAt,
+                                    pendingRepair,
                                     sstable.getSSTableLevel(),
                                     sstable.header,
                                     cfs.indexManager.listIndexes(),
@@ -1255,6 +1286,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                                                               File 
compactionFileLocation,
                                                               int 
expectedBloomFilterSize,
                                                               long repairedAt,
+                                                              UUID 
pendingRepair,
                                                               
Collection<SSTableReader> sstables,
                                                               
LifecycleTransaction txn)
     {
@@ -1277,6 +1309,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         return 
SSTableWriter.create(cfs.newSSTableDescriptor(compactionFileLocation),
                                     (long) expectedBloomFilterSize,
                                     repairedAt,
+                                    pendingRepair,
                                     cfs.metadata,
                                     new MetadataCollector(sstables, 
cfs.metadata().comparator, minLevel),
                                     SerializationHeader.make(cfs.metadata(), 
sstables),
@@ -1320,7 +1353,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                 // If there is a snapshot created for the session then read 
from there.
                 // note that we populate the parent repair session when 
creating the snapshot, meaning the sstables in the snapshot are the ones we
                 // are supposed to validate.
-                sstables = cfs.getSnapshotSSTableReader(snapshotName);
+                sstables = cfs.getSnapshotSSTableReaders(snapshotName);
 
 
                 // Computing gcbefore based on the current time wouldn't be 
very good because we know each replica will execute
@@ -1331,8 +1364,11 @@ public class CompactionManager implements 
CompactionManagerMBean
             }
             else
             {
-                // flush first so everyone is validating data that is as 
similar as possible
-                
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                if (!validator.isConsistent)
+                {
+                    // flush first so everyone is validating data that is as 
similar as possible
+                    
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                }
                 sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
                     return; // this means the parent repair session was 
removed - the repair session failed on another node and we removed it
@@ -1422,7 +1458,8 @@ public class CompactionManager implements 
CompactionManagerMBean
         return tree;
     }
 
-    private synchronized Refs<SSTableReader> 
getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+    @VisibleForTesting
+    synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore 
cfs, Validator validator)
     {
         Refs<SSTableReader> sstables;
 
@@ -1430,11 +1467,20 @@ public class CompactionManager implements 
CompactionManagerMBean
         if (prs == null)
             return null;
         Set<SSTableReader> sstablesToValidate = new HashSet<>();
-        if (prs.isGlobal)
-            prs.markSSTablesRepairing(cfs.metadata.id, 
validator.desc.parentSessionId);
-        // note that we always grab all existing sstables for this - if we 
were to just grab the ones that
-        // were marked as repairing, we would miss any ranges that were 
compacted away and this would cause us to overstream
-        try (ColumnFamilyStore.RefViewFragment sstableCandidates = 
cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> 
!prs.isIncremental || !s.isRepaired())))
+
+        com.google.common.base.Predicate<SSTableReader> predicate;
+        if (validator.isConsistent)
+        {
+            predicate = s -> 
validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair);
+        }
+        else
+        {
+            // note that we always grab all existing sstables for this - if we 
were to just grab the ones that
+            // were marked as repairing, we would miss any ranges that were 
compacted away and this would cause us to overstream
+            predicate = (s) -> !prs.isIncremental || !s.isRepaired();
+        }
+
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = 
cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate)))
         {
             for (SSTableReader sstable : sstableCandidates.sstables)
             {
@@ -1464,7 +1510,7 @@ public class CompactionManager implements 
CompactionManagerMBean
      * @param ranges Repaired ranges to be placed into one of the new 
sstables. The repaired table will be tracked via
      * the {@link 
org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
      */
-    private void doAntiCompaction(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
+    private void doAntiCompaction(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, LifecycleTransaction repaired, long 
repairedAt, UUID pendingRepair)
     {
         logger.info("Performing anticompaction on {} sstables", 
repaired.originals().size());
 
@@ -1476,7 +1522,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             try (LifecycleTransaction txn = repaired.split(sstableGroup))
             {
-                int antiCompacted = antiCompactGroup(cfs, ranges, txn, 
repairedAt);
+                int antiCompacted = antiCompactGroup(cfs, ranges, txn, 
repairedAt, pendingRepair);
                 antiCompactedSSTableCount += antiCompacted;
             }
         }
@@ -1486,7 +1532,7 @@ public class CompactionManager implements 
CompactionManagerMBean
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges,
-                             LifecycleTransaction anticompactionGroup, long 
repairedAt)
+                             LifecycleTransaction anticompactionGroup, long 
repairedAt, UUID pendingRepair)
     {
         long groupMaxDataAge = -1;
 
@@ -1520,8 +1566,8 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             int expectedBloomFilterSize = 
Math.max(cfs.metadata().params.minIndexInterval, 
(int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
-            
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, repairedAt, sstableAsSet, 
anticompactionGroup));
-            
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstableAsSet, anticompactionGroup));
+            
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, repairedAt, pendingRepair, sstableAsSet, 
anticompactionGroup));
+            
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
null, sstableAsSet, anticompactionGroup));
             Range.OrderedRangeContainmentChecker containmentChecker = new 
Range.OrderedRangeContainmentChecker(ranges);
             while (ci.hasNext())
             {
@@ -1669,7 +1715,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
 
         @Override
-        public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
+        public java.util.function.Predicate<Long> 
getPurgeEvaluator(DecoratedKey key)
         {
             /*
              * The main reason we always purge is that including gcable 
tombstone would mean that the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 71b160a..0ccdb49 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,14 +21,16 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.function.Supplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.index.Index;
-import com.google.common.primitives.Ints;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,12 +53,13 @@ import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Manages the compaction strategies.
  *
- * Currently has two instances of actual compaction strategies per data 
directory - one for repaired data and one for
- * unrepaired data. This is done to be able to totally separate the different 
sets of sstables.
+ * For each directory, a separate compaction strategy instance for both 
repaired and unrepaired data, and also one instance
+ * for each pending repair. This is done to keep the different sets of 
sstables completely separate.
  */
 
 public class CompactionStrategyManager implements INotificationConsumer
@@ -66,6 +69,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     private final ColumnFamilyStore cfs;
     private final List<AbstractCompactionStrategy> repaired = new 
ArrayList<>();
     private final List<AbstractCompactionStrategy> unrepaired = new 
ArrayList<>();
+    private final List<PendingRepairManager> pendingRepairs = new 
ArrayList<>();
     private volatile boolean enabled = true;
     private volatile boolean isActive = true;
     private volatile CompactionParams params;
@@ -83,6 +87,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     private volatile CompactionParams schemaCompactionParams;
     private Directories.DataDirectory[] locations;
 
+
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
         cfs.getTracker().subscribe(this);
@@ -110,23 +115,61 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                 return null;
 
             maybeReload(cfs.metadata());
-            List<AbstractCompactionStrategy> strategies = new ArrayList<>();
 
-            strategies.addAll(repaired);
-            strategies.addAll(unrepaired);
-            Collections.sort(strategies, (o1, o2) -> 
Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
-            for (AbstractCompactionStrategy strategy : strategies)
+            // first try to promote/demote sstables from completed repairs
+            ArrayList<Pair<Integer, PendingRepairManager>> 
pendingRepairManagers = new ArrayList<>(pendingRepairs.size());
+            for (PendingRepairManager pendingRepair : pendingRepairs)
+            {
+                int numPending = 
pendingRepair.getNumPendingRepairFinishedTasks();
+                if (numPending > 0)
+                {
+                    pendingRepairManagers.add(Pair.create(numPending, 
pendingRepair));
+                }
+            }
+            if (!pendingRepairManagers.isEmpty())
+            {
+                pendingRepairManagers.sort((x, y) -> y.left - x.left);
+                for (Pair<Integer, PendingRepairManager> pair : 
pendingRepairManagers)
+                {
+                    AbstractCompactionTask task = 
pair.right.getNextRepairFinishedTask();
+                    if (task != null)
+                    {
+                        return task;
+                    }
+                }
+            }
+
+            // sort compaction task suppliers by remaining tasks descending
+            ArrayList<Pair<Integer, Supplier<AbstractCompactionTask>>> 
sortedSuppliers = new ArrayList<>(repaired.size() + unrepaired.size() + 1);
+
+            for (AbstractCompactionStrategy strategy : repaired)
+                
sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> 
strategy.getNextBackgroundTask(gcBefore)));
+
+            for (AbstractCompactionStrategy strategy : unrepaired)
+                
sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> 
strategy.getNextBackgroundTask(gcBefore)));
+
+            for (PendingRepairManager pending : pendingRepairs)
+                
sortedSuppliers.add(Pair.create(pending.getMaxEstimatedRemainingTasks(), () -> 
pending.getNextBackgroundTask(gcBefore)));
+
+            sortedSuppliers.sort((x, y) -> y.left - x.left);
+
+            // return the first non-null task
+            AbstractCompactionTask task;
+            Iterator<Supplier<AbstractCompactionTask>> suppliers = 
Iterables.transform(sortedSuppliers, p -> p.right).iterator();
+            assert suppliers.hasNext();
+
+            do
             {
-                AbstractCompactionTask task = 
strategy.getNextBackgroundTask(gcBefore);
-                if (task != null)
-                    return task;
+                task = suppliers.next().get();
             }
+            while (suppliers.hasNext() && task == null);
+
+            return task;
         }
         finally
         {
             readLock.unlock();
         }
-        return null;
     }
 
     public boolean isEnabled()
@@ -183,6 +226,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             }
             repaired.forEach(AbstractCompactionStrategy::startup);
             unrepaired.forEach(AbstractCompactionStrategy::startup);
+            pendingRepairs.forEach(PendingRepairManager::startup);
         }
         finally
         {
@@ -190,6 +234,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         }
         repaired.forEach(AbstractCompactionStrategy::startup);
         unrepaired.forEach(AbstractCompactionStrategy::startup);
+        pendingRepairs.forEach(PendingRepairManager::startup);
         if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs 
-> cs.logAll))
             compactionLogger.enable();
     }
@@ -207,7 +252,9 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         readLock.lock();
         try
         {
-            if (sstable.isRepaired())
+            if (sstable.isPendingRepair())
+                return pendingRepairs.get(index).getOrCreate(sstable);
+            else if (sstable.isRepaired())
                 return repaired.get(index);
             else
                 return unrepaired.get(index);
@@ -239,22 +286,60 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         Directories.DataDirectory[] directories = 
locations.getWriteableLocations();
         List<PartitionPosition> boundaries = 
StorageService.getDiskBoundaries(cfs, directories);
         if (boundaries == null)
-        {
-            // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.length; i++)
-            {
-                Directories.DataDirectory directory = directories[i];
-                if 
(sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
-                    return i;
-            }
-            return 0;
-        }
+            return getCompactionStrategyIndex(locations, sstable.descriptor);
 
         int pos = Collections.binarySearch(boundaries, sstable.first);
         assert pos < 0; // boundaries are .minkeybound and .maxkeybound so 
they should never be equal
         return -pos - 1;
     }
 
+    /**
+     * get the index for the descriptor based on the existing directories
+     * @param locations
+     * @param descriptor
+     * @return
+     */
+    private static int getCompactionStrategyIndex(Directories locations, 
Descriptor descriptor)
+    {
+         Directories.DataDirectory[] directories = 
locations.getWriteableLocations();
+         // try to figure out location based on sstable directory:
+         for (int i = 0; i < directories.length; i++)
+         {
+             Directories.DataDirectory directory = directories[i];
+             if 
(descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                 return i;
+         }
+         return 0;
+    }
+
+    @VisibleForTesting
+    List<AbstractCompactionStrategy> getRepaired()
+    {
+        return repaired;
+    }
+
+    @VisibleForTesting
+    List<AbstractCompactionStrategy> getUnrepaired()
+    {
+        return unrepaired;
+    }
+
+    @VisibleForTesting
+    List<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID)
+    {
+        List<AbstractCompactionStrategy> strategies = new 
ArrayList<>(pendingRepairs.size());
+        pendingRepairs.forEach(p -> strategies.add(p.get(sessionID)));
+        return strategies;
+    }
+
+    @VisibleForTesting
+    Set<UUID> pendingRepairs()
+    {
+        Set<UUID> ids = new HashSet<>();
+        pendingRepairs.forEach(p -> ids.addAll(p.getSessions()));
+        return ids;
+    }
+
     public void shutdown()
     {
         writeLock.lock();
@@ -263,6 +348,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             isActive = false;
             repaired.forEach(AbstractCompactionStrategy::shutdown);
             unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+            pendingRepairs.forEach(PendingRepairManager::shutdown);
             compactionLogger.disable();
         }
         finally
@@ -332,6 +418,9 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                     count += ((LeveledCompactionStrategy) 
strategy).getLevelSize(0);
                 for (AbstractCompactionStrategy strategy : unrepaired)
                     count += ((LeveledCompactionStrategy) 
strategy).getLevelSize(0);
+                for (PendingRepairManager pendingManager : pendingRepairs)
+                    for (AbstractCompactionStrategy strategy : 
pendingManager.getStrategies())
+                        count += ((LeveledCompactionStrategy) 
strategy).getLevelSize(0);
                 return count;
             }
         }
@@ -377,6 +466,11 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                     int[] unrepairedCountPerLevel = 
((LeveledCompactionStrategy) strategy).getAllLevelSize();
                     res = sumArrays(res, unrepairedCountPerLevel);
                 }
+                for (PendingRepairManager pending : pendingRepairs)
+                {
+                    int[] pendingRepairCountPerLevel = 
pending.getSSTableCountPerLevel();
+                    res = sumArrays(res, pendingRepairCountPerLevel);
+                }
                 return res;
             }
         }
@@ -387,7 +481,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         return null;
     }
 
-    private static int[] sumArrays(int[] a, int[] b)
+    static int[] sumArrays(int[] a, int[] b)
     {
         int[] res = new int[Math.max(a.length, b.length)];
         for (int i = 0; i < res.length; i++)
@@ -451,6 +545,8 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         Directories.DataDirectory [] locations = 
cfs.getDirectories().getWriteableLocations();
         int locationSize = cfs.getPartitioner().splitter().isPresent() ? 
locations.length : 1;
 
+        List<Set<SSTableReader>> pendingRemoved = new 
ArrayList<>(locationSize);
+        List<Set<SSTableReader>> pendingAdded = new ArrayList<>(locationSize);
         List<Set<SSTableReader>> repairedRemoved = new 
ArrayList<>(locationSize);
         List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
         List<Set<SSTableReader>> unrepairedRemoved = new 
ArrayList<>(locationSize);
@@ -458,6 +554,8 @@ public class CompactionStrategyManager implements 
INotificationConsumer
 
         for (int i = 0; i < locationSize; i++)
         {
+            pendingRemoved.add(new HashSet<>());
+            pendingAdded.add(new HashSet<>());
             repairedRemoved.add(new HashSet<>());
             repairedAdded.add(new HashSet<>());
             unrepairedRemoved.add(new HashSet<>());
@@ -467,7 +565,9 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         for (SSTableReader sstable : removed)
         {
             int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-            if (sstable.isRepaired())
+            if (sstable.isPendingRepair())
+                pendingRemoved.get(i).add(sstable);
+            else if (sstable.isRepaired())
                 repairedRemoved.get(i).add(sstable);
             else
                 unrepairedRemoved.get(i).add(sstable);
@@ -475,7 +575,9 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         for (SSTableReader sstable : added)
         {
             int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-            if (sstable.isRepaired())
+            if (sstable.isPendingRepair())
+                pendingAdded.get(i).add(sstable);
+            else if (sstable.isRepaired())
                 repairedAdded.get(i).add(sstable);
             else
                 unrepairedAdded.get(i).add(sstable);
@@ -486,6 +588,17 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         {
             for (int i = 0; i < locationSize; i++)
             {
+
+                if (!pendingRemoved.get(i).isEmpty())
+                {
+                    
pendingRepairs.get(i).replaceSSTables(pendingRemoved.get(i), 
pendingAdded.get(i));
+                }
+                else
+                {
+                    PendingRepairManager pendingManager = 
pendingRepairs.get(i);
+                    pendingAdded.get(i).forEach(s -> 
pendingManager.addSSTable(s));
+                }
+
                 if (!repairedRemoved.get(i).isEmpty())
                     repaired.get(i).replaceSSTables(repairedRemoved.get(i), 
repairedAdded.get(i));
                 else
@@ -512,13 +625,21 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             for (SSTableReader sstable : sstables)
             {
                 int index = getCompactionStrategyIndex(cfs, getDirectories(), 
sstable);
-                if (sstable.isRepaired())
+                if (sstable.isPendingRepair())
+                {
+                    pendingRepairs.get(index).addSSTable(sstable);
+                    unrepaired.get(index).removeSSTable(sstable);
+                    repaired.get(index).removeSSTable(sstable);
+                }
+                else if (sstable.isRepaired())
                 {
+                    pendingRepairs.get(index).removeSSTable(sstable);
                     unrepaired.get(index).removeSSTable(sstable);
                     repaired.get(index).addSSTable(sstable);
                 }
                 else
                 {
+                    pendingRepairs.get(index).removeSSTable(sstable);
                     repaired.get(index).removeSSTable(sstable);
                     unrepaired.get(index).addSSTable(sstable);
                 }
@@ -574,6 +695,8 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                 repaired.forEach(AbstractCompactionStrategy::enable);
             if (unrepaired != null)
                 unrepaired.forEach(AbstractCompactionStrategy::enable);
+            if (pendingRepairs != null)
+                pendingRepairs.forEach(PendingRepairManager::enable);
             // enable this last to make sure the strategies are ready to get 
calls.
             enabled = true;
         }
@@ -594,6 +717,8 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                 repaired.forEach(AbstractCompactionStrategy::disable);
             if (unrepaired != null)
                 unrepaired.forEach(AbstractCompactionStrategy::disable);
+            if (pendingRepairs != null)
+                pendingRepairs.forEach(PendingRepairManager::disable);
         }
         finally
         {
@@ -613,21 +738,27 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     public AbstractCompactionStrategy.ScannerList 
getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> 
ranges)
     {
         assert repaired.size() == unrepaired.size();
+        assert repaired.size() == pendingRepairs.size();
+        List<Set<SSTableReader>> pendingSSTables = new ArrayList<>();
         List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
         List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
 
         for (int i = 0; i < repaired.size(); i++)
         {
+            pendingSSTables.add(new HashSet<>());
             repairedSSTables.add(new HashSet<>());
             unrepairedSSTables.add(new HashSet<>());
         }
 
         for (SSTableReader sstable : sstables)
         {
-            if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, 
getDirectories(), sstable)).add(sstable);
+            int idx = getCompactionStrategyIndex(cfs, getDirectories(), 
sstable);
+            if (sstable.isPendingRepair())
+                pendingSSTables.get(idx).add(sstable);
+            else if (sstable.isRepaired())
+                repairedSSTables.get(idx).add(sstable);
             else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, 
getDirectories(), sstable)).add(sstable);
+                unrepairedSSTables.get(idx).add(sstable);
         }
 
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
@@ -635,6 +766,11 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         readLock.lock();
         try
         {
+            for (int i = 0; i < pendingSSTables.size(); i++)
+            {
+                if (!pendingSSTables.get(i).isEmpty())
+                    
scanners.addAll(pendingRepairs.get(i).getScanners(pendingSSTables.get(i), 
ranges));
+            }
             for (int i = 0; i < repairedSSTables.size(); i++)
             {
                 if (!repairedSSTables.get(i).isEmpty())
@@ -703,12 +839,15 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         assert firstSSTable != null;
         boolean repaired = firstSSTable.isRepaired();
         int firstIndex = getCompactionStrategyIndex(cfs, directories, 
firstSSTable);
+        boolean isPending = firstSSTable.isPendingRepair();
         for (SSTableReader sstable : input)
         {
             if (sstable.isRepaired() != repaired)
                 throw new UnsupportedOperationException("You can't mix 
repaired and unrepaired data in a compaction");
             if (firstIndex != getCompactionStrategyIndex(cfs, directories, 
sstable))
                 throw new UnsupportedOperationException("You can't mix 
sstables from different directories in a compaction");
+            if (isPending != sstable.isPendingRepair())
+                throw new UnsupportedOperationException("You can't compact 
sstables pending for repair with non-pending ones");
         }
     }
 
@@ -739,6 +878,13 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                         if (task != null)
                             tasks.addAll(task);
                     }
+
+                    for (PendingRepairManager pending : pendingRepairs)
+                    {
+                        Collection<AbstractCompactionTask> pendingRepairTasks 
= pending.getMaximalTasks(gcBefore, splitOutput);
+                        if (pendingRepairTasks != null)
+                            tasks.addAll(pendingRepairTasks);
+                    }
                 }
                 finally
                 {
@@ -768,13 +914,16 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         try
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = 
sstables.stream()
-                                                                         
.filter(s -> !s.isMarkedSuspect() && s.isRepaired())
+                                                                         
.filter(s -> !s.isMarkedSuspect() && s.isRepaired() && !s.isPendingRepair())
                                                                          
.collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, 
getDirectories(), s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = 
sstables.stream()
-                                                                           
.filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
+                                                                           
.filter(s -> !s.isMarkedSuspect() && !s.isRepaired() && !s.isPendingRepair())
                                                                            
.collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, 
getDirectories(), s)));
 
+            Map<Integer, List<SSTableReader>> pendingSSTables = 
sstables.stream()
+                                                                        
.filter(s -> !s.isMarkedSuspect() && s.isPendingRepair())
+                                                                        
.collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, 
getDirectories(), s)));
 
             for (Map.Entry<Integer, List<SSTableReader>> group : 
repairedSSTables.entrySet())
                 
ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), 
gcBefore));
@@ -782,6 +931,9 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             for (Map.Entry<Integer, List<SSTableReader>> group : 
unrepairedSSTables.entrySet())
                 
ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), 
gcBefore));
 
+            for (Map.Entry<Integer, List<SSTableReader>> group : 
pendingSSTables.entrySet())
+                
ret.addAll(pendingRepairs.get(group.getKey()).createUserDefinedTasks(group.getValue(),
 gcBefore));
+
             return ret;
         }
         finally
@@ -808,11 +960,12 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         readLock.lock();
         try
         {
-
             for (AbstractCompactionStrategy strategy : repaired)
                 tasks += strategy.getEstimatedRemainingTasks();
             for (AbstractCompactionStrategy strategy : unrepaired)
                 tasks += strategy.getEstimatedRemainingTasks();
+            for (PendingRepairManager pending : pendingRepairs)
+                tasks += pending.getEstimatedRemainingTasks();
         }
         finally
         {
@@ -844,7 +997,9 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         readLock.lock();
         try
         {
-            return Arrays.asList(repaired, unrepaired);
+            List<AbstractCompactionStrategy> pending = new ArrayList<>();
+            pendingRepairs.forEach(p -> pending.addAll(p.getStrategies()));
+            return Arrays.asList(repaired, unrepaired, pending);
         }
         finally
         {
@@ -875,8 +1030,10 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     {
         repaired.forEach(AbstractCompactionStrategy::shutdown);
         unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        pendingRepairs.forEach(PendingRepairManager::shutdown);
         repaired.clear();
         unrepaired.clear();
+        pendingRepairs.clear();
 
         if (cfs.getPartitioner().splitter().isPresent())
         {
@@ -885,12 +1042,14 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             {
                 repaired.add(cfs.createCompactionStrategyInstance(params));
                 unrepaired.add(cfs.createCompactionStrategyInstance(params));
+                pendingRepairs.add(new PendingRepairManager(cfs, params));
             }
         }
         else
         {
             repaired.add(cfs.createCompactionStrategyInstance(params));
             unrepaired.add(cfs.createCompactionStrategyInstance(params));
+            pendingRepairs.add(new PendingRepairManager(cfs, params));
         }
         this.params = params;
     }
@@ -908,6 +1067,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
                                                        long keyCount,
                                                        long repairedAt,
+                                                       UUID pendingRepair,
                                                        MetadataCollector 
collector,
                                                        SerializationHeader 
header,
                                                        Collection<Index> 
indexes,
@@ -916,14 +1076,16 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         readLock.lock();
         try
         {
-            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
-            {
-                return unrepaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, txn);
-            }
+            // to avoid creating a compaction strategy for the wrong pending 
repair manager, we get the index based on where the sstable is to be written
+            int index = cfs.getPartitioner().splitter().isPresent()
+                        ? getCompactionStrategyIndex(getDirectories(), 
descriptor)
+                        : 0;
+            if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
+                return 
pendingRepairs.get(index).getOrCreate(pendingRepair).createSSTableMultiWriter(descriptor,
 keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, collector, 
header, indexes, txn);
+            else if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+                return 
unrepaired.get(index).createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, 
txn);
             else
-            {
-                return repaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, txn);
-            }
+                return 
repaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
         }
         finally
         {
@@ -951,6 +1113,14 @@ public class CompactionStrategyManager implements 
INotificationConsumer
             {
                 return 
Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
             }
+            for (int i = 0; i < pendingRepairs.size(); i++)
+            {
+                PendingRepairManager pending = pendingRepairs.get(i);
+                if (pending.hasStrategy(strategy))
+                {
+                    return 
Collections.singletonList(locations[i].location.getAbsolutePath());
+                }
+            }
         }
         List<String> folders = new ArrayList<>(locations.length);
         for (Directories.DataDirectory location : locations)
@@ -964,4 +1134,10 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     {
         return repaired.get(0).supportsEarlyOpen();
     }
+
+    @VisibleForTesting
+    List<PendingRepairManager> getPendingRepairManagers()
+    {
+        return pendingRepairs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 62efa3d..789de1e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -318,6 +319,22 @@ public class CompactionTask extends AbstractCompactionTask
         return minRepairedAt;
     }
 
+    public static UUID getPendingRepair(Set<SSTableReader> sstables)
+    {
+        if (sstables.isEmpty())
+        {
+            return ActiveRepairService.NO_PENDING_REPAIR;
+        }
+        Set<UUID> ids = new HashSet<>();
+        for (SSTableReader sstable: sstables)
+            ids.add(sstable.getSSTableMetadata().pendingRepair);
+
+        if (ids.size() != 1)
+            throw new RuntimeException(String.format("Attempting to compact 
pending repair sstables with sstables from other repair, or sstables not 
pending repair: %s", ids));
+
+        return ids.iterator().next();
+    }
+
     /*
     Checks if we have enough disk space to execute the compaction.  Drops the 
largest sstable out of the Task until
     there's enough space (in theory) to handle the compaction.  Does not take 
into account space that will be taken by

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 729ddc0..ae35dcd 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -221,6 +221,12 @@ public class DateTieredCompactionStrategy extends 
AbstractCompactionStrategy
         sstables.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
     /**
      * A target time span used for bucketing SSTables based on timestamps.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9495582..4f11a03 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -339,6 +339,12 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
         manifest.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return manifest.getSSTables();
+    }
+
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be 
from the
     // same level (e.g. non overlapping) - see #4142
     private static class LeveledScanner extends 
AbstractIterator<UnfilteredRowIterator> implements ISSTableScanner

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 3d118de..0c53812 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -23,6 +23,7 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -526,6 +527,16 @@ public class LeveledManifest
         return level;
     }
 
+    public synchronized Set<SSTableReader> getSSTables()
+    {
+        ImmutableSet.Builder<SSTableReader> builder = ImmutableSet.builder();
+        for (List<SSTableReader> sstables : generations)
+        {
+            builder.addAll(sstables);
+        }
+        return builder.build();
+    }
+
     private static Set<SSTableReader> overlapping(Collection<SSTableReader> 
candidates, Iterable<SSTableReader> others)
     {
         assert !candidates.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java 
b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
new file mode 100644
index 0000000..a270ead
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Companion to CompactionStrategyManager which manages the sstables marked 
pending repair.
+ *
+ * SSTables are classified as pending repair by the anti-compaction performed 
at the beginning
+ * of an incremental repair, or when they're streamed in with a pending repair 
id. This prevents
+ * unrepaired / pending repaired sstables from being compacted together. Once 
the repair session
+ * has completed, or failed, sstables will be re-classified as part of the 
compaction process.
+ */
+class PendingRepairManager
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PendingRepairManager.class);
+
+    private final ColumnFamilyStore cfs;
+    private final CompactionParams params;
+    private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies 
= ImmutableMap.of();
+
+    PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params)
+    {
+        this.cfs = cfs;
+        this.params = params;
+    }
+
+    private ImmutableMap.Builder<UUID, AbstractCompactionStrategy> mapBuilder()
+    {
+        return ImmutableMap.builder();
+    }
+
+    AbstractCompactionStrategy get(UUID id)
+    {
+        return strategies.get(id);
+    }
+
+    AbstractCompactionStrategy get(SSTableReader sstable)
+    {
+        assert sstable.isPendingRepair();
+        return get(sstable.getSSTableMetadata().pendingRepair);
+    }
+
+    AbstractCompactionStrategy getOrCreate(UUID id)
+    {
+        assert id != null;
+        AbstractCompactionStrategy strategy = get(id);
+        if (strategy == null)
+        {
+            synchronized (this)
+            {
+                strategy = get(id);
+
+                if (strategy == null)
+                {
+                    logger.debug("Creating {}.{} compaction strategy for 
pending repair: {}", cfs.metadata.keyspace, cfs.metadata.name, id);
+                    strategy = cfs.createCompactionStrategyInstance(params);
+                    strategies = mapBuilder().putAll(strategies).put(id, 
strategy).build();
+                }
+            }
+        }
+        return strategy;
+    }
+
+    AbstractCompactionStrategy getOrCreate(SSTableReader sstable)
+    {
+        assert sstable.isPendingRepair();
+        return getOrCreate(sstable.getSSTableMetadata().pendingRepair);
+    }
+
+    private synchronized void removeSession(UUID sessionID)
+    {
+        if (!strategies.containsKey(sessionID))
+            return;
+
+        logger.debug("Removing compaction strategy for pending repair {} on  
{}.{}", sessionID, cfs.metadata.keyspace, cfs.metadata.name);
+        strategies = ImmutableMap.copyOf(Maps.filterKeys(strategies, k -> 
!k.equals(sessionID)));
+    }
+
+    synchronized void removeSSTable(SSTableReader sstable)
+    {
+        for (AbstractCompactionStrategy strategy : strategies.values())
+            strategy.removeSSTable(sstable);
+    }
+
+    synchronized void addSSTable(SSTableReader sstable)
+    {
+        getOrCreate(sstable).addSSTable(sstable);
+    }
+
+    synchronized void replaceSSTables(Set<SSTableReader> removed, 
Set<SSTableReader> added)
+    {
+        if (removed.isEmpty() && added.isEmpty())
+            return;
+
+        // left=removed, right=added
+        Map<UUID, Pair<Set<SSTableReader>, Set<SSTableReader>>> groups = new 
HashMap<>();
+        for (SSTableReader sstable : removed)
+        {
+            UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
+            if (!groups.containsKey(sessionID))
+            {
+                groups.put(sessionID, Pair.create(new HashSet<>(), new 
HashSet<>()));
+            }
+            groups.get(sessionID).left.add(sstable);
+        }
+
+        for (SSTableReader sstable : added)
+        {
+            UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
+            if (!groups.containsKey(sessionID))
+            {
+                groups.put(sessionID, Pair.create(new HashSet<>(), new 
HashSet<>()));
+            }
+            groups.get(sessionID).right.add(sstable);
+        }
+
+        for (Map.Entry<UUID, Pair<Set<SSTableReader>, Set<SSTableReader>>> 
entry : groups.entrySet())
+        {
+            AbstractCompactionStrategy strategy = getOrCreate(entry.getKey());
+            Set<SSTableReader> groupRemoved = entry.getValue().left;
+            Set<SSTableReader> groupAdded = entry.getValue().right;
+
+            if (!groupRemoved.isEmpty())
+                strategy.replaceSSTables(groupRemoved, groupAdded);
+            else
+                strategy.addSSTables(groupAdded);
+        }
+    }
+
+    synchronized void startup()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::startup);
+    }
+
+    synchronized void shutdown()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::shutdown);
+    }
+
+    synchronized void enable()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::enable);
+    }
+
+    synchronized void disable()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::disable);
+    }
+
+    private int getEstimatedRemainingTasks(UUID sessionID, 
AbstractCompactionStrategy strategy)
+    {
+        if (canCleanup(sessionID))
+        {
+            return 0;
+        }
+        else
+        {
+            return strategy.getEstimatedRemainingTasks();
+        }
+    }
+
+    int getEstimatedRemainingTasks()
+    {
+        int tasks = 0;
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : 
strategies.entrySet())
+        {
+            tasks += getEstimatedRemainingTasks(entry.getKey(), 
entry.getValue());
+        }
+        return tasks;
+    }
+
+    /**
+     * @return the highest max remaining tasks of all contained compaction 
strategies
+     */
+    int getMaxEstimatedRemainingTasks()
+    {
+        int tasks = 0;
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : 
strategies.entrySet())
+        {
+            tasks = Math.max(tasks, getEstimatedRemainingTasks(entry.getKey(), 
entry.getValue()));
+        }
+        return tasks;
+    }
+
+    private RepairFinishedCompactionTask getRepairFinishedCompactionTask(UUID 
sessionID)
+    {
+        Set<SSTableReader> sstables = get(sessionID).getSSTables();
+        long repairedAt = 
ActiveRepairService.instance.consistent.local.getFinalSessionRepairedAt(sessionID);
+        LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.COMPACTION);
+        return txn == null ? null : new RepairFinishedCompactionTask(cfs, txn, 
sessionID, repairedAt);
+    }
+
+    synchronized int getNumPendingRepairFinishedTasks()
+    {
+        int count = 0;
+        for (UUID sessionID : strategies.keySet())
+        {
+            if (canCleanup(sessionID))
+            {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    synchronized AbstractCompactionTask getNextRepairFinishedTask()
+    {
+        for (UUID sessionID : strategies.keySet())
+        {
+            if (canCleanup(sessionID))
+            {
+                return getRepairFinishedCompactionTask(sessionID);
+            }
+        }
+        return null;
+    }
+
+    synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        if (strategies.isEmpty())
+            return null;
+
+        Map<UUID, Integer> numTasks = new HashMap<>(strategies.size());
+        ArrayList<UUID> sessions = new ArrayList<>(strategies.size());
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : 
strategies.entrySet())
+        {
+            if (canCleanup(entry.getKey()))
+            {
+                continue;
+            }
+            numTasks.put(entry.getKey(), 
getEstimatedRemainingTasks(entry.getKey(), entry.getValue()));
+            sessions.add(entry.getKey());
+        }
+
+        // we want the session with the most compactions at the head of the 
list
+        sessions.sort((o1, o2) -> numTasks.get(o2) - numTasks.get(o1));
+
+        UUID sessionID = sessions.get(0);
+        return get(sessionID).getNextBackgroundTask(gcBefore);
+    }
+
+    synchronized Collection<AbstractCompactionTask> getMaximalTasks(int 
gcBefore, boolean splitOutput)
+    {
+        if (strategies.isEmpty())
+            return null;
+
+        List<AbstractCompactionTask> maximalTasks = new 
ArrayList<>(strategies.size());
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : 
strategies.entrySet())
+        {
+            if (canCleanup(entry.getKey()))
+            {
+                
maximalTasks.add(getRepairFinishedCompactionTask(entry.getKey()));
+            }
+            else
+            {
+                Collection<AbstractCompactionTask> tasks = 
entry.getValue().getMaximalTask(gcBefore, splitOutput);
+                if (tasks != null)
+                    maximalTasks.addAll(tasks);
+            }
+        }
+        return !maximalTasks.isEmpty() ? maximalTasks : null;
+    }
+
+    Collection<AbstractCompactionStrategy> getStrategies()
+    {
+        return strategies.values();
+    }
+
+    Set<UUID> getSessions()
+    {
+        return strategies.keySet();
+    }
+
+    boolean canCleanup(UUID sessionID)
+    {
+        return 
!ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID);
+    }
+
+    /**
+     * calling this when underlying strategy is not LeveledCompactionStrategy 
is an error
+     */
+    synchronized int[] getSSTableCountPerLevel()
+    {
+        int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+        for (AbstractCompactionStrategy strategy : strategies.values())
+        {
+            assert strategy instanceof LeveledCompactionStrategy;
+            int[] counts = ((LeveledCompactionStrategy) 
strategy).getAllLevelSize();
+            res = CompactionStrategyManager.sumArrays(res, counts);
+        }
+        return res;
+    }
+
+    @SuppressWarnings("resource")
+    synchronized Set<ISSTableScanner> getScanners(Collection<SSTableReader> 
sstables, Collection<Range<Token>> ranges)
+    {
+        if (sstables.isEmpty())
+        {
+            return Collections.emptySet();
+        }
+
+        Map<UUID, Set<SSTableReader>> sessionSSTables = new HashMap<>();
+        for (SSTableReader sstable : sstables)
+        {
+            UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
+            assert sessionID != null;
+            sessionSSTables.computeIfAbsent(sessionID, k -> new 
HashSet<>()).add(sstable);
+        }
+
+        Set<ISSTableScanner> scanners = new HashSet<>(sessionSSTables.size());
+        for (Map.Entry<UUID, Set<SSTableReader>> entry : 
sessionSSTables.entrySet())
+        {
+            scanners.addAll(get(entry.getKey()).getScanners(entry.getValue(), 
ranges).scanners);
+        }
+        return scanners;
+    }
+
+    public boolean hasStrategy(AbstractCompactionStrategy strategy)
+    {
+        return strategies.values().contains(strategy);
+    }
+
+    public Collection<AbstractCompactionTask> 
createUserDefinedTasks(List<SSTableReader> sstables, int gcBefore)
+    {
+        Map<UUID, List<SSTableReader>> group = 
sstables.stream().collect(Collectors.groupingBy(s -> 
s.getSSTableMetadata().pendingRepair));
+        return group.entrySet().stream().map(g -> 
strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), 
gcBefore)).collect(Collectors.toList());
+    }
+
+    /**
+     * promotes/demotes sstables involved in a consistent repair that has been 
finalized, or failed
+     */
+    class RepairFinishedCompactionTask extends AbstractCompactionTask
+    {
+        private final UUID sessionID;
+        private final long repairedAt;
+
+        RepairFinishedCompactionTask(ColumnFamilyStore cfs, 
LifecycleTransaction transaction, UUID sessionID, long repairedAt)
+        {
+            super(cfs, transaction);
+            this.sessionID = sessionID;
+            this.repairedAt = repairedAt;
+        }
+
+        @VisibleForTesting
+        UUID getSessionID()
+        {
+            return sessionID;
+        }
+
+        protected void runMayThrow() throws Exception
+        {
+            for (SSTableReader sstable : transaction.originals())
+            {
+                logger.debug("Setting repairedAt to {} on {} for {}", 
repairedAt, sstable, sessionID);
+                
sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 
repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+                sstable.reloadSSTableMetadata();
+            }
+            
cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals());
+            transaction.abort();
+        }
+
+        public CompactionAwareWriter 
getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, 
LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        protected int 
executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+        {
+            run();
+            return transaction.originals().size();
+        }
+
+        public int execute(CompactionManager.CompactionExecutorStatsCollector 
collector)
+        {
+            try
+            {
+                return super.execute(collector);
+            }
+            finally
+            {
+                removeSession(sessionID);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e8eee9a..cd5238f 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -152,7 +153,8 @@ public class Scrubber implements Closeable
                 assert firstRowPositionFromIndex == 0 : 
firstRowPositionFromIndex;
             }
 
-            writer.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, 
sstable, transaction));
+            StatsMetadata metadata = sstable.getSSTableMetadata();
+            writer.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, metadata.repairedAt, 
metadata.pendingRepair, sstable, transaction));
 
             DecoratedKey prevKey = null;
 
@@ -257,9 +259,9 @@ public class Scrubber implements Closeable
             if (!outOfOrder.isEmpty())
             {
                 // out of order rows, but no bad rows found - we can keep our 
repairedAt time
-                long repairedAt = badRows > 0 ? 
ActiveRepairService.UNREPAIRED_SSTABLE : 
sstable.getSSTableMetadata().repairedAt;
+                long repairedAt = badRows > 0 ? 
ActiveRepairService.UNREPAIRED_SSTABLE : metadata.repairedAt;
                 SSTableReader newInOrderSstable;
-                try (SSTableWriter inOrderWriter = 
CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, 
repairedAt, sstable, transaction))
+                try (SSTableWriter inOrderWriter = 
CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, 
repairedAt, metadata.pendingRepair, sstable, transaction))
                 {
                     for (Partition partition : outOfOrder)
                         inOrderWriter.append(partition.unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 8302a9b..b8c72bb 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -327,6 +328,12 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         sstables.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
     public String toString()
     {
         return String.format("SizeTieredCompactionStrategy[%s/%s]",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 595c46d..0f3c171 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -177,6 +177,12 @@ public class TimeWindowCompactionStrategy extends 
AbstractCompactionStrategy
         sstables.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
     /**
      * Find the lowest and highest timestamps in a given timestamp/unit pair
      * Returns milliseconds, caller should adjust accordingly

Reply via email to