Abstract repair for pluggable storage

Patch by Blake Eggleston; Reviewed by Jason Brown for CASSANDRA-14116


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c5a7fcaa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c5a7fcaa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c5a7fcaa

Branch: refs/heads/trunk
Commit: c5a7fcaa8e00083d6f74967c40474aef07b0d21a
Parents: 478c1a9
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Sat Mar 10 09:16:46 2018 -0800
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Fri Apr 6 16:11:14 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  10 +
 src/java/org/apache/cassandra/db/Keyspace.java  |  10 +
 .../db/compaction/CompactionManager.java        | 257 +------------
 .../repair/CassandraKeyspaceRepairManager.java  |  48 +++
 .../db/repair/CassandraTableRepairManager.java  |  83 +++++
 .../db/repair/CassandraValidationIterator.java  | 321 ++++++++++++++++
 .../db/repair/PendingAntiCompaction.java        | 205 +++++++++++
 .../cassandra/repair/KeyspaceRepairManager.java |  42 +++
 .../repair/RepairMessageVerbHandler.java        |  19 +-
 .../cassandra/repair/TableRepairManager.java    |  64 ++++
 .../cassandra/repair/ValidationManager.java     | 163 +++++++++
 .../repair/ValidationPartitionIterator.java     |  33 ++
 .../repair/consistent/ConsistentSession.java    |   8 +-
 .../repair/consistent/LocalSessions.java        |  42 +--
 .../consistent/PendingAntiCompaction.java       | 202 ----------
 .../cassandra/service/ActiveRepairService.java  |  48 +--
 ...tionManagerGetSSTablesForValidationTest.java | 179 ---------
 .../LeveledCompactionStrategyTest.java          |   4 +-
 ...tionManagerGetSSTablesForValidationTest.java | 182 +++++++++
 .../db/repair/PendingAntiCompactionTest.java    | 366 +++++++++++++++++++
 .../apache/cassandra/repair/ValidatorTest.java  |   2 +-
 .../repair/consistent/LocalSessionTest.java     |  46 ++-
 .../consistent/PendingAntiCompactionTest.java   | 365 ------------------
 .../service/ActiveRepairServiceTest.java        |  12 +-
 25 files changed, 1603 insertions(+), 1109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b3617c3..3db7f27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Abstract repair for pluggable storage (CASSANDRA-14116)
  * Add meaningful toString() impls (CASSANDRA-13653)
  * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
  * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4f74667..e4b84fe 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.streaming.CassandraStreamManager;
+import org.apache.cassandra.db.repair.CassandraTableRepairManager;
 import org.apache.cassandra.db.view.TableViews;
 import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.partitions.CachedPartition;
@@ -75,6 +76,7 @@ import 
org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
+import org.apache.cassandra.repair.TableRepairManager;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.CacheService;
@@ -216,6 +218,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     private final CassandraStreamManager streamManager;
 
+    private final TableRepairManager repairManager;
+
     private volatile boolean compactionSpaceCheck = true;
 
     @VisibleForTesting
@@ -447,6 +451,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             oldMBeanName= null;
         }
         streamManager = new CassandraStreamManager(this);
+        repairManager = new CassandraTableRepairManager(this);
     }
 
     public void updateSpeculationThreshold()
@@ -466,6 +471,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return streamManager;
     }
 
+    public TableRepairManager getRepairManager()
+    {
+        return repairManager;
+    }
+
     public TableMetadata metadata()
     {
         return metadata.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index ae778f1..42d43b2 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.repair.CassandraKeyspaceRepairManager;
 import org.apache.cassandra.db.view.ViewManager;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.index.Index;
@@ -45,6 +46,7 @@ import 
org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.metrics.KeyspaceMetrics;
+import org.apache.cassandra.repair.KeyspaceRepairManager;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.Schema;
@@ -90,6 +92,7 @@ public class Keyspace
     private volatile AbstractReplicationStrategy replicationStrategy;
     public final ViewManager viewManager;
     private volatile ReplicationParams replicationParams;
+    private final KeyspaceRepairManager repairManager;
 
     public static final Function<String,Keyspace> keyspaceTransformer = new 
Function<String, Keyspace>()
     {
@@ -335,6 +338,7 @@ public class Keyspace
             initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables);
         }
         this.viewManager.reload(false);
+        this.repairManager = new CassandraKeyspaceRepairManager(this);
     }
 
     private Keyspace(KeyspaceMetadata metadata)
@@ -343,6 +347,12 @@ public class Keyspace
         createReplicationStrategy(metadata);
         this.metric = new KeyspaceMetrics(this);
         this.viewManager = new ViewManager(this);
+        this.repairManager = new CassandraKeyspaceRepairManager(this);
+    }
+
+    public KeyspaceRepairManager getRepairManager()
+    {
+        return repairManager;
     }
 
     public static Keyspace mockKS(KeyspaceMetadata metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 23b881a..5672dfe 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.repair.ValidationPartitionIterator;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -132,8 +133,7 @@ public class CompactionManager implements 
CompactionManagerMBean
 
     private final RateLimiter compactionRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
 
-    @VisibleForTesting
-    CompactionMetrics getMetrics()
+    public CompactionMetrics getMetrics()
     {
         return metrics;
     }
@@ -941,30 +941,9 @@ public class CompactionManager implements 
CompactionManagerMBean
         return null;
     }
 
-    /**
-     * Does not mutate data, so is not scheduled.
-     */
-    public Future<?> submitValidation(final ColumnFamilyStore cfStore, final 
Validator validator)
+    public Future<?> submitValidation(Callable<Object> validation)
     {
-        Callable<Object> callable = new Callable<Object>()
-        {
-            public Object call() throws IOException
-            {
-                try (TableMetrics.TableTimer.Context c = 
cfStore.metric.validationTime.time())
-                {
-                    doValidationCompaction(cfStore, validator);
-                }
-                catch (Throwable e)
-                {
-                    // we need to inform the remote end of our failure, 
otherwise it will hang on repair forever
-                    validator.fail();
-                    throw e;
-                }
-                return this;
-            }
-        };
-
-        return validationExecutor.submitIfRunning(callable, "validation");
+        return validationExecutor.submitIfRunning(validation, "validation");
     }
 
     /* Used in tests. */
@@ -1315,192 +1294,6 @@ public class CompactionManager implements 
CompactionManagerMBean
                                     txn);
     }
 
-
-    /**
-     * Performs a readonly "compaction" of all sstables in order to validate 
complete rows,
-     * but without writing the merge result
-     */
-    @SuppressWarnings("resource")
-    private void doValidationCompaction(ColumnFamilyStore cfs, Validator 
validator) throws IOException
-    {
-        // this isn't meant to be race-proof, because it's not -- it won't 
cause bugs for a CFS to be dropped
-        // mid-validation, or to attempt to validate a droped CFS.  this is 
just a best effort to avoid useless work,
-        // particularly in the scenario where a validation is submitted before 
the drop, and there are compactions
-        // started prior to the drop keeping some sstables alive.  Since 
validationCompaction can run
-        // concurrently with other compactions, it would otherwise go ahead 
and scan those again.
-        if (!cfs.isValid())
-            return;
-
-        Refs<SSTableReader> sstables = null;
-        try
-        {
-            UUID parentRepairSessionId = validator.desc.parentSessionId;
-            String snapshotName;
-            boolean isGlobalSnapshotValidation = 
cfs.snapshotExists(parentRepairSessionId.toString());
-            if (isGlobalSnapshotValidation)
-                snapshotName = parentRepairSessionId.toString();
-            else
-                snapshotName = validator.desc.sessionId.toString();
-            boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
-
-            if (isSnapshotValidation)
-            {
-                // If there is a snapshot created for the session then read 
from there.
-                // note that we populate the parent repair session when 
creating the snapshot, meaning the sstables in the snapshot are the ones we
-                // are supposed to validate.
-                sstables = cfs.getSnapshotSSTableReaders(snapshotName);
-            }
-            else
-            {
-                if (!validator.isIncremental)
-                {
-                    // flush first so everyone is validating data that is as 
similar as possible
-                    
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-                }
-                sstables = getSSTablesToValidate(cfs, validator);
-                if (sstables == null)
-                    return; // this means the parent repair session was 
removed - the repair session failed on another node and we removed it
-            }
-
-            // Create Merkle trees suitable to hold estimated partitions for 
the given ranges.
-            // We blindly assume that a partition is evenly distributed on all 
sstables for now.
-            MerkleTrees tree = createMerkleTrees(sstables, 
validator.desc.ranges, cfs);
-            long start = System.nanoTime();
-            long partitionCount = 0;
-            try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
-                 ValidationCompactionController controller = new 
ValidationCompactionController(cfs, getDefaultGcBefore(cfs, 
validator.nowInSec));
-                 CompactionIterator ci = new 
ValidationCompactionIterator(scanners.scanners, controller, validator.nowInSec, 
metrics))
-            {
-                // validate the CF as we iterate over it
-                validator.prepare(cfs, tree);
-                while (ci.hasNext())
-                {
-                    if (ci.isStopRequested())
-                        throw new 
CompactionInterruptedException(ci.getCompactionInfo());
-                    try (UnfilteredRowIterator partition = ci.next())
-                    {
-                        validator.add(partition);
-                        partitionCount++;
-                    }
-                }
-                validator.complete();
-            }
-            finally
-            {
-                if (isSnapshotValidation && !isGlobalSnapshotValidation)
-                {
-                    // we can only clear the snapshot if we are not doing a 
global snapshot validation (we then clear it once anticompaction
-                    // is done).
-                    cfs.clearSnapshot(snapshotName);
-                }
-                cfs.metric.partitionsValidated.update(partitionCount);
-            }
-            long estimatedTotalBytes = 0;
-            for (SSTableReader sstable : sstables)
-            {
-                for (Pair<Long, Long> positionsForRanges : 
sstable.getPositionsForRanges(validator.desc.ranges))
-                    estimatedTotalBytes += positionsForRanges.right - 
positionsForRanges.left;
-            }
-            cfs.metric.bytesValidated.update(estimatedTotalBytes);
-            if (logger.isDebugEnabled())
-            {
-                long duration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                logger.debug("Validation of {} partitions (~{}) finished in {} 
msec, for {}",
-                             partitionCount,
-                             
FBUtilities.prettyPrintMemory(estimatedTotalBytes),
-                             duration,
-                             validator.desc);
-            }
-        }
-        finally
-        {
-            if (sstables != null)
-                sstables.release();
-        }
-    }
-
-    private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> 
sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
-    {
-        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
-        long allPartitions = 0;
-        Map<Range<Token>, Long> rangePartitionCounts = 
Maps.newHashMapWithExpectedSize(ranges.size());
-        for (Range<Token> range : ranges)
-        {
-            long numPartitions = 0;
-            for (SSTableReader sstable : sstables)
-                numPartitions += 
sstable.estimatedKeysForRanges(Collections.singleton(range));
-            rangePartitionCounts.put(range, numPartitions);
-            allPartitions += numPartitions;
-        }
-
-        for (Range<Token> range : ranges)
-        {
-            long numPartitions = rangePartitionCounts.get(range);
-            double rangeOwningRatio = allPartitions > 0 ? 
(double)numPartitions / allPartitions : 0;
-            // determine max tree depth proportional to range size to avoid 
blowing up memory with multiple tress,
-            // capping at 20 to prevent large tree (CASSANDRA-11390)
-            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - 
Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
-            // determine tree depth from number of partitions, capping at max 
tree depth (CASSANDRA-5263)
-            int depth = numPartitions > 0 ? (int) 
Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
-            tree.addMerkleTree((int) Math.pow(2, depth), range);
-        }
-        if (logger.isDebugEnabled())
-        {
-            // MT serialize may take time
-            logger.debug("Created {} merkle trees with merkle trees size {}, 
{} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, 
MerkleTrees.serializer.serializedSize(tree, 0));
-        }
-
-        return tree;
-    }
-
-    @VisibleForTesting
-    synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore 
cfs, Validator validator)
-    {
-        Refs<SSTableReader> sstables;
-
-        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
-        if (prs == null)
-            return null;
-        Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
-        com.google.common.base.Predicate<SSTableReader> predicate;
-        if (prs.isPreview())
-        {
-            predicate = prs.getPreviewPredicate();
-
-        }
-        else if (validator.isIncremental)
-        {
-            predicate = s -> 
validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair);
-        }
-        else
-        {
-            // note that we always grab all existing sstables for this - if we 
were to just grab the ones that
-            // were marked as repairing, we would miss any ranges that were 
compacted away and this would cause us to overstream
-            predicate = (s) -> !prs.isIncremental || !s.isRepaired();
-        }
-
-        try (ColumnFamilyStore.RefViewFragment sstableCandidates = 
cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate)))
-        {
-            for (SSTableReader sstable : sstableCandidates.sstables)
-            {
-                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(validator.desc.ranges))
-                {
-                    sstablesToValidate.add(sstable);
-                }
-            }
-
-            sstables = Refs.tryRef(sstablesToValidate);
-            if (sstables == null)
-            {
-                logger.error("Could not reference sstables");
-                throw new RuntimeException("Could not reference sstables");
-            }
-        }
-
-        return sstables;
-    }
-
     /**
      * Splits up an sstable into two new sstables. The first of the new tables 
will store repaired ranges, the second
      * will store the non-repaired ranges. Once anticompation is completed, 
the original sstable is marked as compacted
@@ -1702,45 +1495,6 @@ public class CompactionManager implements 
CompactionManagerMBean
         return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec);
     }
 
-    private static class ValidationCompactionIterator extends 
CompactionIterator
-    {
-        public ValidationCompactionIterator(List<ISSTableScanner> scanners, 
ValidationCompactionController controller, int nowInSec, CompactionMetrics 
metrics)
-        {
-            super(OperationType.VALIDATION, scanners, controller, nowInSec, 
UUIDGen.getTimeUUID(), metrics);
-        }
-    }
-
-    /*
-     * Controller for validation compaction that always purges.
-     * Note that we should not call cfs.getOverlappingSSTables on the provided
-     * sstables because those sstables are not guaranteed to be active sstables
-     * (since we can run repair on a snapshot).
-     */
-    private static class ValidationCompactionController extends 
CompactionController
-    {
-        public ValidationCompactionController(ColumnFamilyStore cfs, int 
gcBefore)
-        {
-            super(cfs, gcBefore);
-        }
-
-        @Override
-        public LongPredicate getPurgeEvaluator(DecoratedKey key)
-        {
-            /*
-             * The main reason we always purge is that including gcable 
tombstone would mean that the
-             * repair digest will depends on the scheduling of compaction on 
the different nodes. This
-             * is still not perfect because gcbefore is currently dependend on 
the current time at which
-             * the validation compaction start, which while not too bad for 
normal repair is broken for
-             * repair on snapshots. A better solution would be to agree on a 
gcbefore that all node would
-             * use, and we'll do that with CASSANDRA-4932.
-             * Note validation compaction includes all sstables, so we don't 
have the problem of purging
-             * a tombstone that could shadow a column in another sstable, but 
this is doubly not a concern
-             * since validation compaction is read-only.
-             */
-            return time -> true;
-        }
-    }
-
     public ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task)
     {
         return viewBuildExecutor.submitIfRunning(() -> {
@@ -1856,7 +1610,8 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
     }
 
-    private static class ValidationExecutor extends CompactionExecutor
+    // TODO: pull out relevant parts of CompactionExecutor and move to 
ValidationManager
+    public static class ValidationExecutor extends CompactionExecutor
     {
         public ValidationExecutor()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java 
b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
new file mode 100644
index 0000000..5f2e5a0
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.KeyspaceRepairManager;
+
+public class CassandraKeyspaceRepairManager implements KeyspaceRepairManager
+{
+    private final Keyspace keyspace;
+
+    public CassandraKeyspaceRepairManager(Keyspace keyspace)
+    {
+        this.keyspace = keyspace;
+    }
+
+    @Override
+    public ListenableFuture prepareIncrementalRepair(UUID sessionID, 
Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, 
ExecutorService executor)
+    {
+        PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, 
tables, ranges, executor);
+        return pac.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java 
b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java
new file mode 100644
index 0000000..983e30f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Predicate;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.TableRepairManager;
+import org.apache.cassandra.repair.ValidationPartitionIterator;
+import org.apache.cassandra.repair.Validator;
+
+public class CassandraTableRepairManager implements TableRepairManager
+{
+    private final ColumnFamilyStore cfs;
+
+    public CassandraTableRepairManager(ColumnFamilyStore cfs)
+    {
+        this.cfs = cfs;
+    }
+
+    @Override
+    public ValidationPartitionIterator 
getValidationIterator(Collection<Range<Token>> ranges, UUID parentId, UUID 
sessionID, boolean isIncremental, int nowInSec) throws IOException
+    {
+        return new CassandraValidationIterator(cfs, ranges, parentId, 
sessionID, isIncremental, nowInSec);
+    }
+
+    @Override
+    public Future<?> submitValidation(Callable<Object> validation)
+    {
+        return CompactionManager.instance.submitValidation(validation);
+    }
+
+    @Override
+    public void incrementalSessionCompleted(UUID sessionID)
+    {
+        CompactionManager.instance.submitBackground(cfs);
+    }
+
+    @Override
+    public synchronized void snapshot(String name, Collection<Range<Token>> 
ranges, boolean force)
+    {
+        if (force || !cfs.snapshotExists(name))
+        {
+            cfs.snapshot(name, new Predicate<SSTableReader>()
+            {
+                public boolean apply(SSTableReader sstable)
+                {
+                    return sstable != null &&
+                           !sstable.metadata().isIndex() && // exclude 
SSTables from 2i
+                           new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges);
+                }
+            }, true, false); //ephemeral snapshot, if repair fails, it will be 
cleaned next startup
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java 
b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
new file mode 100644
index 0000000..992a384
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.LongPredicate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.CompactionIterator;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.CompactionMetrics;
+import org.apache.cassandra.repair.ValidationPartitionIterator;
+import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class CassandraValidationIterator extends ValidationPartitionIterator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraValidationIterator.class);
+
+    /*
+     * Controller for validation compaction that always purges.
+     * Note that we should not call cfs.getOverlappingSSTables on the provided
+     * sstables because those sstables are not guaranteed to be active sstables
+     * (since we can run repair on a snapshot).
+     */
+    private static class ValidationCompactionController extends 
CompactionController
+    {
+        public ValidationCompactionController(ColumnFamilyStore cfs, int 
gcBefore)
+        {
+            super(cfs, gcBefore);
+        }
+
+        @Override
+        public LongPredicate getPurgeEvaluator(DecoratedKey key)
+        {
+            /*
+             * The main reason we always purge is that including gcable 
tombstone would mean that the
+             * repair digest will depends on the scheduling of compaction on 
the different nodes. This
+             * is still not perfect because gcbefore is currently dependend on 
the current time at which
+             * the validation compaction start, which while not too bad for 
normal repair is broken for
+             * repair on snapshots. A better solution would be to agree on a 
gcbefore that all node would
+             * use, and we'll do that with CASSANDRA-4932.
+             * Note validation compaction includes all sstables, so we don't 
have the problem of purging
+             * a tombstone that could shadow a column in another sstable, but 
this is doubly not a concern
+             * since validation compaction is read-only.
+             */
+            return time -> true;
+        }
+    }
+
+    public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec)
+    {
+        // 2ndary indexes have ExpiringColumns too, so we need to purge 
tombstones deleted before now. We do not need to
+        // add any GcGrace however since 2ndary indexes are local to a node.
+        return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec);
+    }
+
+    private static class ValidationCompactionIterator extends 
CompactionIterator
+    {
+        public ValidationCompactionIterator(List<ISSTableScanner> scanners, 
ValidationCompactionController controller, int nowInSec, CompactionMetrics 
metrics)
+        {
+            super(OperationType.VALIDATION, scanners, controller, nowInSec, 
UUIDGen.getTimeUUID(), metrics);
+        }
+    }
+
+    private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind 
previewKind)
+    {
+        switch (previewKind)
+        {
+            case ALL:
+                return (s) -> true;
+            case REPAIRED:
+                return (s) -> s.isRepaired();
+            case UNREPAIRED:
+                return (s) -> !s.isRepaired();
+            default:
+                throw new RuntimeException("Can't get preview predicate for 
preview kind " + previewKind);
+        }
+    }
+
+    @VisibleForTesting
+    static synchronized Refs<SSTableReader> 
getSSTablesToValidate(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, 
UUID parentId, boolean isIncremental)
+    {
+        Refs<SSTableReader> sstables;
+
+        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(parentId);
+        if (prs == null)
+        {
+            // this means the parent repair session was removed - the repair 
session failed on another node and we removed it
+            return new Refs<>();
+        }
+
+        Set<SSTableReader> sstablesToValidate = new HashSet<>();
+
+        com.google.common.base.Predicate<SSTableReader> predicate;
+        if (prs.isPreview())
+        {
+            predicate = getPreviewPredicate(prs.previewKind);
+
+        }
+        else if (isIncremental)
+        {
+            predicate = s -> 
parentId.equals(s.getSSTableMetadata().pendingRepair);
+        }
+        else
+        {
+            // note that we always grab all existing sstables for this - if we 
were to just grab the ones that
+            // were marked as repairing, we would miss any ranges that were 
compacted away and this would cause us to overstream
+            predicate = (s) -> !prs.isIncremental || !s.isRepaired();
+        }
+
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = 
cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate)))
+        {
+            for (SSTableReader sstable : sstableCandidates.sstables)
+            {
+                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges))
+                {
+                    sstablesToValidate.add(sstable);
+                }
+            }
+
+            sstables = Refs.tryRef(sstablesToValidate);
+            if (sstables == null)
+            {
+                logger.error("Could not reference sstables");
+                throw new RuntimeException("Could not reference sstables");
+            }
+        }
+
+        return sstables;
+    }
+
+    private final ColumnFamilyStore cfs;
+    private final Refs<SSTableReader> sstables;
+    private final String snapshotName;
+    private final boolean isGlobalSnapshotValidation;
+
+    private final boolean isSnapshotValidation;
+    private final AbstractCompactionStrategy.ScannerList scanners;
+    private final ValidationCompactionController controller;
+
+    private final CompactionIterator ci;
+
+    private final long estimatedBytes;
+    private final long estimatedPartitions;
+    private final Map<Range<Token>, Long> rangePartitionCounts;
+
+    public CassandraValidationIterator(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, UUID parentId, UUID sessionID, boolean 
isIncremental, int nowInSec) throws IOException
+    {
+        this.cfs = cfs;
+
+        isGlobalSnapshotValidation = cfs.snapshotExists(parentId.toString());
+        if (isGlobalSnapshotValidation)
+            snapshotName = parentId.toString();
+        else
+            snapshotName = sessionID.toString();
+        isSnapshotValidation = cfs.snapshotExists(snapshotName);
+
+        if (isSnapshotValidation)
+        {
+            // If there is a snapshot created for the session then read from 
there.
+            // note that we populate the parent repair session when creating 
the snapshot, meaning the sstables in the snapshot are the ones we
+            // are supposed to validate.
+            sstables = cfs.getSnapshotSSTableReaders(snapshotName);
+        }
+        else
+        {
+            if (!isIncremental)
+            {
+                // flush first so everyone is validating data that is as 
similar as possible
+                
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+            }
+            sstables = getSSTablesToValidate(cfs, ranges, parentId, 
isIncremental);
+        }
+
+        Preconditions.checkArgument(sstables != null);
+        controller = new ValidationCompactionController(cfs, 
getDefaultGcBefore(cfs, nowInSec));
+        scanners = cfs.getCompactionStrategyManager().getScanners(sstables, 
ranges);
+        ci = new ValidationCompactionIterator(scanners.scanners, controller, 
nowInSec, CompactionManager.instance.getMetrics());
+
+        long allPartitions = 0;
+        rangePartitionCounts = Maps.newHashMapWithExpectedSize(ranges.size());
+        for (Range<Token> range : ranges)
+        {
+            long numPartitions = 0;
+            for (SSTableReader sstable : sstables)
+                numPartitions += 
sstable.estimatedKeysForRanges(Collections.singleton(range));
+            rangePartitionCounts.put(range, numPartitions);
+            allPartitions += numPartitions;
+        }
+        estimatedPartitions = allPartitions;
+
+        long estimatedTotalBytes = 0;
+        for (SSTableReader sstable : sstables)
+        {
+            for (Pair<Long, Long> positionsForRanges : 
sstable.getPositionsForRanges(ranges))
+                estimatedTotalBytes += positionsForRanges.right - 
positionsForRanges.left;
+        }
+        estimatedBytes = estimatedTotalBytes;
+    }
+
+    @Override
+    public void close()
+    {
+        // TODO: can any of this fail and leave stuff unreleased?
+        super.close();
+
+        if (ci != null)
+            ci.close();
+
+        if (scanners != null)
+            scanners.close();
+
+        if (controller != null)
+            controller.close();
+
+        if (isSnapshotValidation && !isGlobalSnapshotValidation)
+        {
+            // we can only clear the snapshot if we are not doing a global 
snapshot validation (we then clear it once anticompaction
+            // is done).
+            cfs.clearSnapshot(snapshotName);
+        }
+
+        if (sstables != null)
+            sstables.release();
+    }
+
+    @Override
+    public TableMetadata metadata()
+    {
+        return cfs.metadata.get();
+    }
+
+    private void throwIfStopRequested()
+    {
+        if (ci.isStopRequested())
+            throw new CompactionInterruptedException(ci.getCompactionInfo());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        throwIfStopRequested();
+        return ci.hasNext();
+    }
+
+    @Override
+    public UnfilteredRowIterator next()
+    {
+        throwIfStopRequested();
+        return ci.next();
+    }
+
+    @Override
+    public long getEstimatedBytes()
+    {
+        return estimatedBytes;
+    }
+
+    @Override
+    public long estimatedPartitions()
+    {
+        return estimatedPartitions;
+    }
+
+    @Override
+    public Map<Range<Token>, Long> getRangePartitionCounts()
+    {
+        return rangePartitionCounts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java 
b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
new file mode 100644
index 0000000..4e0f13d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.repair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Performs an anti compaction on a set of tables and token ranges, isolating 
the unrepaired sstables
+ * for a give token range into a pending repair group so they can't be 
compacted with other sstables
+ * while they are being repaired.
+ */
+public class PendingAntiCompaction
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PendingAntiCompaction.class);
+
+    static class AcquireResult
+    {
+        final ColumnFamilyStore cfs;
+        final Refs<SSTableReader> refs;
+        final LifecycleTransaction txn;
+
+        AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, 
LifecycleTransaction txn)
+        {
+            this.cfs = cfs;
+            this.refs = refs;
+            this.txn = txn;
+        }
+
+        void abort()
+        {
+            if (txn != null)
+                txn.abort();
+            if (refs != null)
+                refs.release();
+        }
+    }
+
+    static class SSTableAcquisitionException extends RuntimeException {}
+
+    static class AcquisitionCallable implements Callable<AcquireResult>
+    {
+        private final ColumnFamilyStore cfs;
+        private final Collection<Range<Token>> ranges;
+        private final UUID sessionID;
+
+        public AcquisitionCallable(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, UUID sessionID)
+        {
+            this.cfs = cfs;
+            this.ranges = ranges;
+            this.sessionID = sessionID;
+        }
+
+        private Iterable<SSTableReader> getSSTables()
+        {
+            return Iterables.filter(cfs.getLiveSSTables(), s -> 
!s.isRepaired() && !s.isPendingRepair() && s.intersects(ranges));
+        }
+
+        @SuppressWarnings("resource")
+        private AcquireResult acquireTuple()
+        {
+            List<SSTableReader> sstables = Lists.newArrayList(getSSTables());
+            if (sstables.isEmpty())
+                return new AcquireResult(cfs, null, null);
+
+            LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
+            if (txn != null)
+                return new AcquireResult(cfs, Refs.ref(sstables), txn);
+            else
+                return null;
+        }
+
+        public AcquireResult call() throws Exception
+        {
+            logger.debug("acquiring sstables for pending anti compaction on 
session {}", sessionID);
+            AcquireResult refTxn = acquireTuple();
+            if (refTxn != null)
+                return refTxn;
+
+            // try to modify after cancelling running compactions. This will 
attempt to cancel in flight compactions for
+            // up to a minute, after which point, null will be returned
+            return cfs.runWithCompactionsDisabled(this::acquireTuple, false, 
false);
+        }
+    }
+
+    static class AcquisitionCallback implements 
AsyncFunction<List<AcquireResult>, Object>
+    {
+        private final UUID parentRepairSession;
+        private final Collection<Range<Token>> ranges;
+
+        public AcquisitionCallback(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
+        {
+            this.parentRepairSession = parentRepairSession;
+            this.ranges = ranges;
+        }
+
+        ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
+        {
+            return 
CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, 
result.refs, result.txn, parentRepairSession);
+        }
+
+        public ListenableFuture apply(List<AcquireResult> results) throws 
Exception
+        {
+            if (Iterables.any(results, t -> t == null))
+            {
+                // Release all sstables, and report failure back to coordinator
+                for (AcquireResult result : results)
+                {
+                    if (result != null)
+                    {
+                        logger.info("Releasing acquired sstables for {}.{}", 
result.cfs.metadata.keyspace, result.cfs.metadata.name);
+                        result.abort();
+                    }
+                }
+                logger.warn("Prepare phase for incremental repair session {} 
was unable to " +
+                            "acquire exclusive access to the neccesary 
sstables. " +
+                            "This is usually caused by running multiple 
incremental repairs on nodes that share token ranges",
+                            parentRepairSession);
+                return Futures.immediateFailedFuture(new 
SSTableAcquisitionException());
+            }
+            else
+            {
+                List<ListenableFuture<?>> pendingAntiCompactions = new 
ArrayList<>(results.size());
+                for (AcquireResult result : results)
+                {
+                    if (result.txn != null)
+                    {
+                        ListenableFuture<?> future = 
submitPendingAntiCompaction(result);
+                        pendingAntiCompactions.add(future);
+                    }
+                }
+
+                return Futures.allAsList(pendingAntiCompactions);
+            }
+        }
+    }
+
+    private final UUID prsId;
+    private final Collection<ColumnFamilyStore> tables;
+    private final Collection<Range<Token>> ranges;
+    private final ExecutorService executor;
+
+    public PendingAntiCompaction(UUID prsId, Collection<ColumnFamilyStore> 
tables, Collection<Range<Token>> ranges, ExecutorService executor)
+    {
+        this.prsId = prsId;
+        this.tables = tables;
+        this.ranges = ranges;
+        this.executor = executor;
+    }
+
+    public ListenableFuture run()
+    {
+        List<ListenableFutureTask<AcquireResult>> tasks = new 
ArrayList<>(tables.size());
+        for (ColumnFamilyStore cfs : tables)
+        {
+            cfs.forceBlockingFlush();
+            ListenableFutureTask<AcquireResult> task = 
ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId));
+            executor.submit(task);
+            tasks.add(task);
+        }
+        ListenableFuture<List<AcquireResult>> acquisitionResults = 
Futures.successfulAsList(tasks);
+        ListenableFuture compactionResult = 
Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, 
ranges), MoreExecutors.directExecutor());
+        return compactionResult;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java 
b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
new file mode 100644
index 0000000..8aa4381
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Keyspace level hook for repair.
+ */
+public interface KeyspaceRepairManager
+{
+    /**
+     * Isolate the unrepaired ranges of the given tables, and make 
referenceable by session id. Until each table has
+     * been notified that the repair session has been completed, the data 
associated with the given session id must
+     * not be combined with repaired or unrepaired data, or data from other 
repair sessions.
+     */
+    ListenableFuture prepareIncrementalRepair(UUID sessionID, 
Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, 
ExecutorService executor);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 4c0a564..1e92a81 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -19,14 +19,10 @@ package org.apache.cassandra.repair;
 
 import java.util.*;
 
-import com.google.common.base.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
@@ -103,21 +99,14 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     }
 
                     ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+                    TableRepairManager repairManager = cfs.getRepairManager();
                     if (prs.isGlobal)
                     {
-                        prs.maybeSnapshot(cfs.metadata.id, 
desc.parentSessionId);
+                        
repairManager.snapshot(desc.parentSessionId.toString(), prs.getRanges(), false);
                     }
                     else
                     {
-                        cfs.snapshot(desc.sessionId.toString(), new 
Predicate<SSTableReader>()
-                        {
-                            public boolean apply(SSTableReader sstable)
-                            {
-                                return sstable != null &&
-                                       !sstable.metadata().isIndex() && // 
exclude SSTables from 2i
-                                       new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(desc.ranges);
-                            }
-                        }, true, false); //ephemeral snapshot, if repair 
fails, it will be cleaned next startup
+                        
repairManager.snapshot(desc.parentSessionId.toString(), desc.ranges, true);
                     }
                     logger.debug("Enqueuing response to snapshot request {} to 
{}", desc.sessionId, message.from);
                     MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
@@ -138,7 +127,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     
ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
                     Validator validator = new Validator(desc, message.from, 
validationRequest.nowInSec,
                                                         
isIncremental(desc.parentSessionId), previewKind(desc.parentSessionId));
-                    CompactionManager.instance.submitValidation(store, 
validator);
+                    ValidationManager.instance.submitValidation(store, 
validator);
                     break;
 
                 case SYNC_REQUEST:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/TableRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/TableRepairManager.java 
b/src/java/org/apache/cassandra/repair/TableRepairManager.java
new file mode 100644
index 0000000..b72af1d
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/TableRepairManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Table level hook for repair
+ */
+public interface TableRepairManager
+{
+    /**
+     * Return a validation iterator for the given parameters. If isIncremental 
is true, the iterator must only include
+     * data previously isolated for repair with the given parentId. nowInSec 
should determine whether tombstones shouldn
+     * be purged or not.
+     */
+    ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> 
ranges, UUID parentId, UUID sessionID, boolean isIncremental, int nowInSec) 
throws IOException;
+
+    /**
+     * Begin execution of the given validation callable. Which thread pool a 
validation should run in is an implementation detail.
+     */
+    Future<?> submitValidation(Callable<Object> validation);
+
+    /**
+     * Called when the given incremental session has completed. Because of 
race and failure conditions, implementors
+     * should not rely only on receiving calls from this method to determine 
when a session has ended. Implementors
+     * can determine if a session has finished by calling 
ActiveRepairService.instance.consistent.local.isSessionInProgress.
+     *
+     * Just because a session has completed doesn't mean it's completed 
succesfully. So implementors need to consult the
+     * repair service at 
ActiveRepairService.instance.consistent.local.getFinalSessionRepairedAt to get 
the repairedAt
+     * time. If the repairedAt time is zero, the data for the given session 
should be demoted back to unrepaired. Otherwise,
+     * it should be promoted to repaired with the given repaired time.
+     */
+    void incrementalSessionCompleted(UUID sessionID);
+
+    /**
+     * For snapshot repairs. A snapshot of the current data for the given 
ranges should be taken with the given name.
+     * If force is true, a snapshot should be taken even if one already exists 
with that name.
+     */
+    void snapshot(String name, Collection<Range<Token>> ranges, boolean force);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/ValidationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationManager.java 
b/src/java/org/apache/cassandra/repair/ValidationManager.java
new file mode 100644
index 0000000..d664c8a
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/ValidationManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
+
+public class ValidationManager
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ValidationManager.class);
+
+    public static final ValidationManager instance = new ValidationManager();
+
+    private ValidationManager() {}
+
+    private static MerkleTrees createMerkleTrees(ValidationPartitionIterator 
validationIterator, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
+    {
+        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+        long allPartitions = validationIterator.estimatedPartitions();
+        Map<Range<Token>, Long> rangePartitionCounts = 
validationIterator.getRangePartitionCounts();
+
+        for (Range<Token> range : ranges)
+        {
+            long numPartitions = rangePartitionCounts.get(range);
+            double rangeOwningRatio = allPartitions > 0 ? 
(double)numPartitions / allPartitions : 0;
+            // determine max tree depth proportional to range size to avoid 
blowing up memory with multiple tress,
+            // capping at 20 to prevent large tree (CASSANDRA-11390)
+            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - 
Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
+            // determine tree depth from number of partitions, capping at max 
tree depth (CASSANDRA-5263)
+            int depth = numPartitions > 0 ? (int) 
Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
+            tree.addMerkleTree((int) Math.pow(2, depth), range);
+        }
+        if (logger.isDebugEnabled())
+        {
+            // MT serialize may take time
+            logger.debug("Created {} merkle trees with merkle trees size {}, 
{} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, 
MerkleTrees.serializer.serializedSize(tree, 0));
+        }
+
+        return tree;
+    }
+
+    private static ValidationPartitionIterator 
getValidationIterator(TableRepairManager repairManager, Validator validator) 
throws IOException
+    {
+        RepairJobDesc desc = validator.desc;
+        return repairManager.getValidationIterator(desc.ranges, 
desc.parentSessionId, desc.sessionId, validator.isIncremental, 
validator.nowInSec);
+    }
+
+    /**
+     * Performs a readonly "compaction" of all sstables in order to validate 
complete rows,
+     * but without writing the merge result
+     */
+    @SuppressWarnings("resource")
+    private void doValidation(ColumnFamilyStore cfs, Validator validator) 
throws IOException
+    {
+        // this isn't meant to be race-proof, because it's not -- it won't 
cause bugs for a CFS to be dropped
+        // mid-validation, or to attempt to validate a droped CFS.  this is 
just a best effort to avoid useless work,
+        // particularly in the scenario where a validation is submitted before 
the drop, and there are compactions
+        // started prior to the drop keeping some sstables alive.  Since 
validationCompaction can run
+        // concurrently with other compactions, it would otherwise go ahead 
and scan those again.
+        if (!cfs.isValid())
+            return;
+
+        // Create Merkle trees suitable to hold estimated partitions for the 
given ranges.
+        // We blindly assume that a partition is evenly distributed on all 
sstables for now.
+        long start = System.nanoTime();
+        long partitionCount = 0;
+        long estimatedTotalBytes = 0;
+        try (ValidationPartitionIterator vi = 
getValidationIterator(cfs.getRepairManager(), validator))
+        {
+            MerkleTrees tree = createMerkleTrees(vi, validator.desc.ranges, 
cfs);
+            try
+            {
+                // validate the CF as we iterate over it
+                validator.prepare(cfs, tree);
+                while (vi.hasNext())
+                {
+                    try (UnfilteredRowIterator partition = vi.next())
+                    {
+                        validator.add(partition);
+                        partitionCount++;
+                    }
+                }
+                validator.complete();
+            }
+            finally
+            {
+                estimatedTotalBytes = vi.getEstimatedBytes();
+                partitionCount = vi.estimatedPartitions();
+            }
+        }
+        finally
+        {
+            cfs.metric.bytesValidated.update(estimatedTotalBytes);
+            cfs.metric.partitionsValidated.update(partitionCount);
+        }
+        if (logger.isDebugEnabled())
+        {
+            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+            logger.debug("Validation of {} partitions (~{}) finished in {} 
msec, for {}",
+                         partitionCount,
+                         FBUtilities.prettyPrintMemory(estimatedTotalBytes),
+                         duration,
+                         validator.desc);
+        }
+    }
+
+    /**
+     * Does not mutate data, so is not scheduled.
+     */
+    public Future<?> submitValidation(ColumnFamilyStore cfs, Validator 
validator)
+    {
+        Callable<Object> validation = new Callable<Object>()
+        {
+            public Object call() throws IOException
+            {
+                try (TableMetrics.TableTimer.Context c = 
cfs.metric.validationTime.time())
+                {
+                    doValidation(cfs, validator);
+                }
+                catch (Throwable e)
+                {
+                    // we need to inform the remote end of our failure, 
otherwise it will hang on repair forever
+                    validator.fail();
+                    throw e;
+                }
+                return this;
+            }
+        };
+
+        return cfs.getRepairManager().submitValidation(validation);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java 
b/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java
new file mode 100644
index 0000000..ccfae41
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/ValidationPartitionIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public abstract class ValidationPartitionIterator extends 
AbstractUnfilteredPartitionIterator
+{
+    public abstract long getEstimatedBytes();
+    public abstract long estimatedPartitions();
+    public abstract Map<Range<Token>, Long> getRangePartitionCounts();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java 
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index 78057e2..03de157 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -64,10 +64,10 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  The consistent prepare step promotes the parent repair session to a 
consistent session, and isolates the sstables
  *  being repaired other sstables. First, the coordinator sends a {@link 
PrepareConsistentRequest} message to each repair
  *  participant (including itself). When received, the node creates a {@link 
LocalSession} instance, sets it's state to
- *  {@code PREPARING}, persists it, and begins a {@link PendingAntiCompaction} 
task. When the pending anti compaction
- *  completes, the session state is set to {@code PREPARED}, and a {@link 
PrepareConsistentResponse} is sent to the
- *  coordinator indicating success or failure. If the pending anti-compaction 
fails, the local session state is set
- *  to {@code FAILED}.
+ *  {@code PREPARING}, persists it, and begins a preparing the tables for 
incremental repair, which segregates the data
+ *  being repaired from the rest of the table data. When the preparation 
completes, the session state is set to
+ *  {@code PREPARED}, and a {@link PrepareConsistentResponse} is sent to the 
coordinator indicating success or failure.
+ *  If the pending anti-compaction fails, the local session state is set to 
{@code FAILED}.
  *  <p/>
  *  (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, 
PrepareConsistentRequest)}
  *  <p/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index e62f6fd..ed25166 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -19,10 +19,10 @@
 package org.apache.cassandra.repair.consistent;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,16 +52,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.repair.KeyspaceRepairManager;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.IPartitioner;
@@ -81,8 +79,6 @@ import 
org.apache.cassandra.repair.messages.PrepareConsistentResponse;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.StatusRequest;
 import org.apache.cassandra.repair.messages.StatusResponse;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
@@ -547,19 +543,17 @@ public class LocalSessions
     }
 
     @VisibleForTesting
-    ListenableFuture submitPendingAntiCompaction(LocalSession session, 
ExecutorService executor)
+    ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID 
sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> 
ranges, ExecutorService executor)
     {
-        PendingAntiCompaction pac = new 
PendingAntiCompaction(session.sessionID, session.ranges, executor);
-        return pac.run();
+        return repairManager.prepareIncrementalRepair(sessionID, tables, 
ranges, executor);
     }
 
     /**
-     * The PrepareConsistentRequest effectively promotes the parent repair 
session to a consistent
-     * incremental session, and begins the 'pending anti compaction' which 
moves all sstable data
-     * that is to be repaired into it's own silo, preventing it from mixing 
with other data.
+     * The PrepareConsistentRequest promotes the parent repair session to a 
consistent incremental
+     * session, and isolates the data to be repaired from the rest of the 
table's data
      *
-     * No response is sent to the repair coordinator until the pending anti 
compaction has completed
-     * successfully. If the pending anti compaction fails, a failure message 
is sent to the coordinator,
+     * No response is sent to the repair coordinator until the data 
preparation / isolation has completed
+     * successfully. If the data preparation fails, a failure message is sent 
to the coordinator,
      * cancelling the session.
      */
     public void handlePrepareMessage(InetAddressAndPort from, 
PrepareConsistentRequest request)
@@ -587,8 +581,10 @@ public class LocalSessions
 
         ExecutorService executor = 
Executors.newFixedThreadPool(parentSession.getColumnFamilyStores().size());
 
-        ListenableFuture pendingAntiCompaction = 
submitPendingAntiCompaction(session, executor);
-        Futures.addCallback(pendingAntiCompaction, new FutureCallback()
+        KeyspaceRepairManager repairManager = 
parentSession.getKeyspace().getRepairManager();
+        ListenableFuture repairPreparation = prepareSession(repairManager, 
sessionID, parentSession.getColumnFamilyStores(), parentSession.getRanges(), 
executor);
+
+        Futures.addCallback(repairPreparation, new FutureCallback<Object>()
         {
             public void onSuccess(@Nullable Object result)
             {
@@ -601,18 +597,6 @@ public class LocalSessions
             public void onFailure(Throwable t)
             {
                 logger.error("Prepare phase for incremental repair session {} 
failed", sessionID, t);
-                if (t instanceof 
PendingAntiCompaction.SSTableAcquisitionException)
-                {
-                    logger.warn("Prepare phase for incremental repair session 
{} was unable to " +
-                                "acquire exclusive access to the neccesary 
sstables. " +
-                                "This is usually caused by running multiple 
incremental repairs on nodes that share token ranges",
-                                sessionID);
-
-                }
-                else
-                {
-                    logger.error("Prepare phase for incremental repair session 
{} failed", sessionID, t);
-                }
                 sendMessage(coordinator, new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
                 failSession(sessionID, false);
                 executor.shutdown();
@@ -673,7 +657,7 @@ public class LocalSessions
             ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tid);
             if (cfs != null)
             {
-                CompactionManager.instance.submitBackground(cfs);
+                
cfs.getRepairManager().incrementalSessionCompleted(session.sessionID);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5a7fcaa/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java 
b/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java
deleted file mode 100644
index dc3b45d..0000000
--- a/src/java/org/apache/cassandra/repair/consistent/PendingAntiCompaction.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair.consistent;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-/**
- * Performs an anti compaction on a set of tables and token ranges, isolating 
the unrepaired sstables
- * for a give token range into a pending repair group so they can't be 
compacted with other sstables
- * while they are being repaired.
- */
-public class PendingAntiCompaction
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(PendingAntiCompaction.class);
-
-    static class AcquireResult
-    {
-        final ColumnFamilyStore cfs;
-        final Refs<SSTableReader> refs;
-        final LifecycleTransaction txn;
-
-        AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, 
LifecycleTransaction txn)
-        {
-            this.cfs = cfs;
-            this.refs = refs;
-            this.txn = txn;
-        }
-
-        void abort()
-        {
-            if (txn != null)
-                txn.abort();
-            if (refs != null)
-                refs.release();
-        }
-    }
-
-    static class SSTableAcquisitionException extends RuntimeException {}
-
-    static class AcquisitionCallable implements Callable<AcquireResult>
-    {
-        private final ColumnFamilyStore cfs;
-        private final Collection<Range<Token>> ranges;
-        private final UUID sessionID;
-
-        public AcquisitionCallable(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges, UUID sessionID)
-        {
-            this.cfs = cfs;
-            this.ranges = ranges;
-            this.sessionID = sessionID;
-        }
-
-        private Iterable<SSTableReader> getSSTables()
-        {
-            return Iterables.filter(cfs.getLiveSSTables(), s -> 
!s.isRepaired() && !s.isPendingRepair() && s.intersects(ranges));
-        }
-
-        @SuppressWarnings("resource")
-        private AcquireResult acquireTuple()
-        {
-            List<SSTableReader> sstables = Lists.newArrayList(getSSTables());
-            if (sstables.isEmpty())
-                return new AcquireResult(cfs, null, null);
-
-            LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
-            if (txn != null)
-                return new AcquireResult(cfs, Refs.ref(sstables), txn);
-            else
-                return null;
-        }
-
-        public AcquireResult call() throws Exception
-        {
-            logger.debug("acquiring sstables for pending anti compaction on 
session {}", sessionID);
-            AcquireResult refTxn = acquireTuple();
-            if (refTxn != null)
-                return refTxn;
-
-            // try to modify after cancelling running compactions. This will 
attempt to cancel in flight compactions for
-            // up to a minute, after which point, null will be returned
-            return cfs.runWithCompactionsDisabled(this::acquireTuple, false, 
false);
-        }
-    }
-
-    static class AcquisitionCallback implements 
AsyncFunction<List<AcquireResult>, Object>
-    {
-        private final UUID parentRepairSession;
-        private final Collection<Range<Token>> ranges;
-
-        public AcquisitionCallback(UUID parentRepairSession, 
Collection<Range<Token>> ranges)
-        {
-            this.parentRepairSession = parentRepairSession;
-            this.ranges = ranges;
-        }
-
-        ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
-        {
-            return 
CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, 
result.refs, result.txn, parentRepairSession);
-        }
-
-        public ListenableFuture apply(List<AcquireResult> results) throws 
Exception
-        {
-            if (Iterables.any(results, t -> t == null))
-            {
-                // Release all sstables, and report failure back to coordinator
-                for (AcquireResult result : results)
-                {
-                    if (result != null)
-                    {
-                        logger.info("Releasing acquired sstables for {}.{}", 
result.cfs.metadata.keyspace, result.cfs.metadata.name);
-                        result.abort();
-                    }
-                }
-                return Futures.immediateFailedFuture(new 
SSTableAcquisitionException());
-            }
-            else
-            {
-                List<ListenableFuture<?>> pendingAntiCompactions = new 
ArrayList<>(results.size());
-                for (AcquireResult result : results)
-                {
-                    if (result.txn != null)
-                    {
-                        ListenableFuture<?> future = 
submitPendingAntiCompaction(result);
-                        pendingAntiCompactions.add(future);
-                    }
-                }
-
-                return Futures.allAsList(pendingAntiCompactions);
-            }
-        }
-    }
-
-    private final UUID prsId;
-    private final Collection<Range<Token>> ranges;
-    private final ExecutorService executor;
-
-    public PendingAntiCompaction(UUID prsId, Collection<Range<Token>> ranges, 
ExecutorService executor)
-    {
-        this.prsId = prsId;
-        this.ranges = ranges;
-        this.executor = executor;
-    }
-
-    public ListenableFuture run()
-    {
-        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
-        Collection<ColumnFamilyStore> cfss = prs.getColumnFamilyStores();
-        List<ListenableFutureTask<AcquireResult>> tasks = new 
ArrayList<>(cfss.size());
-        for (ColumnFamilyStore cfs : cfss)
-        {
-            cfs.forceBlockingFlush();
-            ListenableFutureTask<AcquireResult> task = 
ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId));
-            executor.submit(task);
-            tasks.add(task);
-        }
-        ListenableFuture<List<AcquireResult>> acquisitionResults = 
Futures.successfulAsList(tasks);
-        ListenableFuture compactionResult = 
Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, 
ranges), MoreExecutors.directExecutor());
-        return compactionResult;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to