Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/225232a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/225232a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/225232a9

Branch: refs/heads/cassandra-3.0
Commit: 225232a9ea8945c85ae4f9cac3b97e003c9e9035
Parents: 774e59d 3c8421a
Author: Marcus Eriksson <marc...@apache.org>
Authored: Thu Jun 23 11:22:57 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Thu Jun 23 11:22:57 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  32 ++--
 .../repair/RepairMessageVerbHandler.java        |  32 ++--
 .../cassandra/service/ActiveRepairService.java  | 173 +++++++++++++++++--
 .../service/ActiveRepairServiceTest.java        | 110 +++++++++++-
 5 files changed, 296 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0be1043,03246ae..b366d21
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,6 +1,37 @@@
 -2.1.15
 +2.2.7
 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
 + * Validate bloom_filter_fp_chance against lowest supported
 +   value when the table is created (CASSANDRA-11920)
 + * RandomAccessReader: call isEOF() only when rebuffering, not for every read 
operation (CASSANDRA-12013)
 + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
 + * StorageService shutdown hook should use a volatile variable 
(CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options 
(CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i 
(CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction 
(CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during 
ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches 
and
 +   report errors correctly if workers processes crash on initialization 
(CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
   * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
+  * Avoid marking too many sstables as repaired (CASSANDRA-11696)
   * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991)
   * Remove distinction between non-existing static columns and existing but 
null in LWTs (CASSANDRA-9842)
   * Support mlockall on IBM POWER arch (CASSANDRA-11576)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e76abad,87819ba..cf82498
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1068,9 -1006,9 +1068,16 @@@ public class CompactionManager implemen
          try
          {
  
--            String snapshotName = validator.desc.sessionId.toString();
              int gcBefore;
++            UUID parentRepairSessionId = validator.desc.parentSessionId;
++            String snapshotName;
++            boolean isGlobalSnapshotValidation = 
cfs.snapshotExists(parentRepairSessionId.toString());
++            if (isGlobalSnapshotValidation)
++                snapshotName = parentRepairSessionId.toString();
++            else
++                snapshotName = validator.desc.sessionId.toString();
              boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
++
              if (isSnapshotValidation)
              {
                  // If there is a snapshot created for the session then read 
from there.
@@@ -1130,7 -1073,7 +1137,9 @@@
                  }
                  finally
                  {
--                    if (isSnapshotValidation)
++                    // we can only clear the snapshot if we are not doing a 
global snapshot validation (we then clear it once anticompaction
++                    // is done).
++                    if (isSnapshotValidation && !isGlobalSnapshotValidation)
                      {
                          cfs.clearSnapshot(snapshotName);
                      }
@@@ -1158,48 -1101,6 +1167,41 @@@
          }
      }
  
 +    private synchronized Refs<SSTableReader> 
getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
 +    {
 +        Refs<SSTableReader> sstables;
 +
 +        ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
 +        if (prs == null)
 +            return null;
 +        Set<SSTableReader> sstablesToValidate = new HashSet<>();
++
++        if (prs.isGlobal)
++            prs.markSSTablesRepairing(cfs.metadata.cfId, 
validator.desc.parentSessionId);
++
++        // note that we always grab all existing sstables for this - if we 
were to just grab the ones that
++        // were marked as repairing, we would miss any ranges that were 
compacted away and this would cause us to overstream
 +        try (ColumnFamilyStore.RefViewFragment sstableCandidates = 
cfs.selectAndReference(prs.isIncremental ? 
ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
 +        {
 +            for (SSTableReader sstable : sstableCandidates.sstables)
 +            {
 +                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
 +                {
 +                    sstablesToValidate.add(sstable);
 +                }
 +            }
 +
-             if (prs.isGlobal)
-             {
-                 Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, 
validator.desc.parentSessionId);
- 
-                 if (!Sets.intersection(currentlyRepairing, 
sstablesToValidate).isEmpty())
-                 {
-                     logger.error("Cannot start multiple repair sessions over 
the same sstables");
-                     throw new RuntimeException("Cannot start multiple repair 
sessions over the same sstables");
-                 }
-             }
- 
 +            sstables = Refs.tryRef(sstablesToValidate);
 +            if (sstables == null)
 +            {
 +                logger.error("Could not reference sstables");
 +                throw new RuntimeException("Could not reference sstables");
 +            }
 +        }
-         if (prs.isGlobal)
-             prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
 +
 +        return sstables;
 +    }
 +
      /**
       * Splits up an sstable into two new sstables. The first of the new 
tables will store repaired ranges, the second
       * will store the non-repaired ranges. Once anticompation is completed, 
the original sstable is marked as compacted

http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 6e7922f,7debc93..1701e9a
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -91,36 -78,18 +91,32 @@@ public class RepairMessageVerbHandler i
                      break;
  
                  case SNAPSHOT:
 -                    ColumnFamilyStore cfs = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
 -                    final Range<Token> repairingRange = desc.range;
 -                    cfs.snapshot(desc.sessionId.toString(), new 
Predicate<SSTableReader>()
 +                    logger.debug("Snapshotting {}", desc);
 +                    final ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
 +                    if (cfs == null)
 +                    {
 +                        logErrorAndSendFailureResponse(String.format("Table 
%s.%s was dropped during snapshot phase of repair",
 +                                                                     
desc.keyspace, desc.columnFamily), message.from, id);
 +                        return;
 +                    }
-                     final Range<Token> repairingRange = desc.range;
-                     Set<SSTableReader> snapshottedSSSTables = 
cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
++                    ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
++                    if (prs.isGlobal)
                      {
--                        public boolean apply(SSTableReader sstable)
-                         {
-                             return sstable != null &&
-                                     !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
-                                     new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
-                         }
-                     }, true); //ephemeral snapshot, if repair fails, it will 
be cleaned next startup
-                     if 
(ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).isGlobal)
++                        prs.maybeSnapshot(cfs.metadata.cfId, 
desc.parentSessionId);
++                    }
++                    else
 +                    {
-                         Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, 
desc.parentSessionId);
-                         if (!Sets.intersection(currentlyRepairing, 
snapshottedSSSTables).isEmpty())
++                        final Range<Token> repairingRange = desc.range;
++                        cfs.snapshot(desc.sessionId.toString(), new 
Predicate<SSTableReader>()
                          {
-                             // clear snapshot that we just created
-                             cfs.clearSnapshot(desc.sessionId.toString());
-                             logErrorAndSendFailureResponse("Cannot start 
multiple repair sessions over the same sstables", message.from, id);
-                             return;
 -                            return sstable != null &&
 -                                    !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
 -                                    new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
--                        }
-                         
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 snapshottedSSSTables);
 -                    }, true); //ephemeral snapshot, if repair fails, it will 
be cleaned next startup
 -
++                            public boolean apply(SSTableReader sstable)
++                            {
++                                return sstable != null &&
++                                       !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
++                                       new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
++                            }
++                        }, true); //ephemeral snapshot, if repair fails, it 
will be cleaned next startup
 +                    }
                      logger.debug("Enqueuing response to snapshot request {} 
to {}", desc.sessionId, message.from);
                      MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                      break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 0bb7172,bab244d..e111155
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -25,6 -23,6 +25,7 @@@ import java.util.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  
++import com.google.common.base.Predicate;
  import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Multimap;
  import com.google.common.collect.Sets;
@@@ -35,10 -33,11 +36,13 @@@ import com.google.common.util.concurren
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 -import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.db.ColumnFamily;
  import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.dht.Bounds;
++import org.apache.cassandra.dht.LocalPartitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.ApplicationState;
@@@ -308,13 -306,13 +312,13 @@@ public class ActiveRepairService implem
          }
          catch (InterruptedException e)
          {
--            parentRepairSessions.remove(parentRepairSession);
++            removeParentRepairSession(parentRepairSession);
              throw new RuntimeException("Did not get replies from all 
endpoints. List of failed endpoint(s): " + failedNodes.toString(), e);
          }
  
          if (!status.get())
          {
--            parentRepairSessions.remove(parentRepairSession);
++            removeParentRepairSession(parentRepairSession);
              throw new RuntimeException("Did not get positive replies from all 
endpoints. List of failed endpoint(s): " + failedNodes.toString());
          }
  
@@@ -376,8 -405,8 +380,21 @@@
          return session;
      }
  
++    /**
++     * called when the repair session is done - either failed or 
anticompaction has completed
++     *
++     * clears out any snapshots created by this repair
++     *
++     * @param parentSessionId
++     * @return
++     */
      public synchronized ParentRepairSession removeParentRepairSession(UUID 
parentSessionId)
      {
++        for (ColumnFamilyStore cfs : 
getParentRepairSession(parentSessionId).columnFamilyStores.values())
++        {
++            if (cfs.snapshotExists(parentSessionId.toString()))
++                cfs.clearSnapshot(parentSessionId.toString());
++        }
          return parentRepairSessions.remove(parentSessionId);
      }
  
@@@ -393,26 -421,13 +410,26 @@@
      {
          assert parentRepairSession != null;
          ParentRepairSession prs = getParentRepairSession(parentRepairSession);
 +        //A repair will be marked as not global if it is a subrange repair to 
avoid many small anti-compactions
 +        //in addition to other scenarios such as repairs not involving all 
DCs or hosts
 +        if (!prs.isGlobal)
 +        {
 +            logger.info("Not a global repair, will not do anticompaction");
 +            removeParentRepairSession(parentRepairSession);
 +            return Futures.immediateFuture(Collections.emptyList());
 +        }
 +        assert prs.ranges.containsAll(successfulRanges) : "Trying to perform 
anticompaction on unknown ranges";
  
          List<ListenableFuture<?>> futures = new ArrayList<>();
 -        for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : 
prs.columnFamilyStores.entrySet())
 +        // if we don't have successful repair ranges, then just skip 
anticompaction
 +        if (!successfulRanges.isEmpty())
          {
 -            Refs<SSTableReader> sstables = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey());
 -            ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
 -            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, 
prs.ranges, sstables, prs.repairedAt));
 +            for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : 
prs.columnFamilyStores.entrySet())
 +            {
-                 Refs<SSTableReader> sstables = 
prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey());
++                Refs<SSTableReader> sstables = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(),
 parentRepairSession);
 +                ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
 +                
futures.add(CompactionManager.instance.submitAntiCompaction(cfs, 
successfulRanges, sstables, prs.repairedAt));
 +            }
          }
  
          ListenableFuture<List<Object>> allAntiCompactionResults = 
Futures.successfulAsList(futures);
@@@ -450,17 -465,39 +467,33 @@@
          }
      }
  
+     /**
+      * We keep a ParentRepairSession around for the duration of the entire 
repair, for example, on a 256 token vnode rf=3 cluster
+      * we would have 768 RepairSession but only one ParentRepairSession. We 
use the PRS to avoid anticompacting the sstables
+      * 768 times, instead we take all repaired ranges at the end of the 
repair and anticompact once.
+      *
+      * We do an optimistic marking of sstables - when we start an incremental 
repair we mark all unrepaired sstables as
+      * repairing (@see markSSTablesRepairing), then while the repair is 
ongoing compactions might remove those sstables,
+      * and when it is time for anticompaction we will only anticompact the 
sstables that are still on disk.
+      *
+      * Note that validation and streaming do not care about which sstables we 
have marked as repairing - they operate on
+      * all unrepaired sstables (if it is incremental), otherwise we would not 
get a correct repair.
+      */
      public static class ParentRepairSession
      {
 -        public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new 
HashMap<>();
 -        public final Collection<Range<Token>> ranges;
 +        private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new 
HashMap<>();
 +        private final Collection<Range<Token>> ranges;
          public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
 -        /**
 -         * used as fail time if failed is true
 -         */
 +        public final boolean isIncremental;
 +        public final boolean isGlobal;
          public final long repairedAt;
          public final InetAddress coordinator;
+         /**
 -         * Used to mark a repair as failed - if the coordinator thinks that 
the repair is still ongoing and sends a
 -         * request, we need to fail the coordinator as well.
 -         */
 -        public final boolean failed;
 -        /**
+          * Indicates whether we have marked sstables as repairing. Can only 
be done once per table per ParentRepairSession
+          */
+         private final Set<UUID> marked = new HashSet<>();
  
 -        public ParentRepairSession(InetAddress coordinator, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
long repairedAt, boolean failed)
 +        public ParentRepairSession(InetAddress coordinator, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental, boolean isGlobal, long repairedAt)
          {
              this.coordinator = coordinator;
              for (ColumnFamilyStore cfs : columnFamilyStores)
@@@ -470,15 -507,28 +503,51 @@@
              }
              this.ranges = ranges;
              this.repairedAt = repairedAt;
 -            this.failed = failed;
 +            this.isGlobal = isGlobal;
 +            this.isIncremental = isIncremental;
          }
  
 -        public ParentRepairSession(InetAddress coordinator, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
long repairedAt)
++        /**
++         * Mark sstables repairing - either all sstables or only the 
unrepaired ones depending on
++         *
++         * whether this is an incremental or full repair
++         *
++         * @param cfId the column family
++         * @param parentSessionId the parent repair session id, used to make 
sure we don't start multiple repairs over the same sstables
++         */
++        public synchronized void markSSTablesRepairing(UUID cfId, UUID 
parentSessionId)
+         {
 -            this(coordinator, columnFamilyStores, ranges, repairedAt, false);
++            if (!marked.contains(cfId))
++            {
++                List<SSTableReader> sstables = 
columnFamilyStores.get(cfId).select(isIncremental ? 
ColumnFamilyStore.UNREPAIRED_SSTABLES : 
ColumnFamilyStore.CANONICAL_SSTABLES).sstables;
++                Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
++                if (!Sets.intersection(currentlyRepairing, 
Sets.newHashSet(sstables)).isEmpty())
++                {
++                    logger.error("Cannot start multiple repair sessions over 
the same sstables");
++                    throw new RuntimeException("Cannot start multiple repair 
sessions over the same sstables");
++                }
++                addSSTables(cfId, sstables);
++                marked.add(cfId);
++            }
+         }
+ 
+         /**
 -         * Gets the repairing sstables for anticompaction.
++         * Get the still active sstables we should run anticompaction on
+          *
 -         * Note that validation and streaming uses the real unrepaired 
sstables.
++         * note that validation and streaming do not call this method - they 
have to work on the actual active sstables on the node, we only call this
++         * to know which sstables are still there that were there when we 
started the repair
+          *
+          * @param cfId
++         * @param parentSessionId for checking if there exists a snapshot for 
this repair
+          * @return
+          */
          @SuppressWarnings("resource")
-         public synchronized Refs<SSTableReader> 
getActiveRepairedSSTableRefs(UUID cfId)
 -        public synchronized Refs<SSTableReader> 
getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId)
++        public synchronized Refs<SSTableReader> 
getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId)
          {
+             assert marked.contains(cfId);
++            boolean isSnapshotRepair = 
columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString());
              ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> 
references = ImmutableMap.builder();
--            for (SSTableReader sstable : getActiveSSTables(cfId))
++            for (SSTableReader sstable : isSnapshotRepair ? 
getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId))
              {
                  Ref<SSTableReader> ref = sstable.tryRef();
                  if (ref == null)
@@@ -489,12 -539,38 +558,97 @@@
              return new Refs<>(references.build());
          }
  
+         /**
 -         * Marks all the unrepaired sstables as repairing unless we have 
already done so.
++         * If we are running a snapshot repair we need to find the 'real' 
sstables when we start anticompaction
+          *
 -         * Any of these sstables that are still on disk are then 
anticompacted once the streaming and validation phases are done.
++         * We use the generation of the sstables as identifiers instead of 
the file name to avoid having to parse out the
++         * actual filename.
+          *
+          * @param cfId
 -         * @param parentSessionId used to check that we don't start multiple 
inc repair sessions over the same sstables
++         * @param parentSessionId
++         * @return
+          */
 -        public synchronized void markSSTablesRepairing(UUID cfId, UUID 
parentSessionId)
++        private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, 
UUID parentSessionId)
+         {
 -            if (!marked.contains(cfId))
++            Set<SSTableReader> activeSSTables = new HashSet<>();
++            ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
++
++            Set<Integer> snapshotGenerations = new HashSet<>();
++            try (Refs<SSTableReader> snapshottedSSTables = 
cfs.getSnapshotSSTableReader(parentSessionId.toString()))
+             {
 -                List<SSTableReader> sstables = 
columnFamilyStores.get(cfId).select(ColumnFamilyStore.UNREPAIRED_SSTABLES).sstables;
 -                Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
 -                if (!Sets.intersection(currentlyRepairing, 
Sets.newHashSet(sstables)).isEmpty())
++                for (SSTableReader sstable : snapshottedSSTables)
++                {
++                    snapshotGenerations.add(sstable.descriptor.generation);
++                }
++            }
++            catch (IOException e)
++            {
++                throw new RuntimeException(e);
++            }
++            for (SSTableReader sstable : 
cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables)
++                if 
(snapshotGenerations.contains(sstable.descriptor.generation))
++                    activeSSTables.add(sstable);
++            return activeSSTables;
++        }
++
++        public synchronized void maybeSnapshot(UUID cfId, UUID 
parentSessionId)
++        {
++            String snapshotName = parentSessionId.toString();
++            if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName))
++            {
++                Set<SSTableReader> snapshottedSSTables = 
columnFamilyStores.get(cfId).snapshot(snapshotName, new 
Predicate<SSTableReader>()
++                {
++                    public boolean apply(SSTableReader sstable)
++                    {
++                        return sstable != null &&
++                               (!isIncremental || !sstable.isRepaired()) &&
++                               !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
++                               new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges);
++                    }
++                }, true);
++
++                if (isAlreadyRepairing(cfId, parentSessionId, 
snapshottedSSTables))
+                 {
++                    
columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString());
+                     logger.error("Cannot start multiple repair sessions over 
the same sstables");
+                     throw new RuntimeException("Cannot start multiple repair 
sessions over the same sstables");
+                 }
 -                addSSTables(cfId, sstables);
++                addSSTables(cfId, snapshottedSSTables);
+                 marked.add(cfId);
+             }
+         }
+ 
++
++        /**
++         * Compares other repairing sstables *generation* to the ones we just 
snapshotted
++         *
++         * we compare generations since the sstables have different paths due 
to snapshot names
++         *
++         * @param cfId id of the column family store
++         * @param parentSessionId parent repair session
++         * @param sstables the newly snapshotted sstables
++         * @return
++         */
++        private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, 
Collection<SSTableReader> sstables)
++        {
++            Set<SSTableReader> currentlyRepairing = 
ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
++            Set<Integer> currentlyRepairingGenerations = new HashSet<>();
++            Set<Integer> newRepairingGenerations = new HashSet<>();
++            for (SSTableReader sstable : currentlyRepairing)
++                
currentlyRepairingGenerations.add(sstable.descriptor.generation);
++            for (SSTableReader sstable : sstables)
++                newRepairingGenerations.add(sstable.descriptor.generation);
++
++            return !Sets.intersection(currentlyRepairingGenerations, 
newRepairingGenerations).isEmpty();
++        }
++
          private Set<SSTableReader> getActiveSSTables(UUID cfId)
          {
 -            if (failed)
 -                return Collections.emptySet();
              Set<String> repairedSSTables = sstableMap.get(cfId);
              Set<SSTableReader> activeSSTables = new HashSet<>();
              Set<String> activeSSTableNames = new HashSet<>();
--            for (SSTableReader sstable : 
columnFamilyStores.get(cfId).getSSTables())
++            ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
++            for (SSTableReader sstable : 
cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables)
              {
                  if (repairedSSTables.contains(sstable.getFilename()))
                  {
@@@ -506,21 -582,16 +660,20 @@@
              return activeSSTables;
          }
  
-         public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
+         private void addSSTables(UUID cfId, Collection<SSTableReader> 
sstables)
          {
              for (SSTableReader sstable : sstables)
-             {
                  sstableMap.get(cfId).add(sstable.getFilename());
-             }
          }
  
 -        public ParentRepairSession asFailed()
++
 +        public long getRepairedAt()
          {
 -            return new ParentRepairSession(coordinator, 
Collections.<ColumnFamilyStore>emptyList(), 
Collections.<Range<Token>>emptyList(), System.currentTimeMillis(), true);
 +            if (isGlobal)
 +                return repairedAt;
 +            return ActiveRepairService.UNREPAIRED_SSTABLE;
          }
 +
          @Override
          public String toString()
          {
@@@ -580,9 -652,12 +733,9 @@@
  
          if (!toRemove.isEmpty())
          {
 -            logger.debug("Failing {} in parent repair sessions", toRemove);
 +            logger.debug("Removing {} in parent repair sessions", toRemove);
              for (UUID id : toRemove)
-                 parentRepairSessions.remove(id);
 -            {
 -                ParentRepairSession failed = parentRepairSessions.get(id);
 -                parentRepairSessions.replace(id, failed, failed.asFailed());
 -            }
++                removeParentRepairSession(id);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index f34d0e2,cf64322..03a25c6
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -1,30 -1,32 +1,31 @@@
  /*
 - * Licensed to the Apache Software Foundation (ASF) under one
 - * or more contributor license agreements.  See the NOTICE file
 - * distributed with this work for additional information
 - * regarding copyright ownership.  The ASF licenses this file
 - * to you under the Apache License, Version 2.0 (the
 - * "License"); you may not use this file except in compliance
 - * with the License.  You may obtain a copy of the License at
 - *
 - *     http://www.apache.org/licenses/LICENSE-2.0
 - *
 - * Unless required by applicable law or agreed to in writing, software
 - * distributed under the License is distributed on an "AS IS" BASIS,
 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 - * See the License for the specific language governing permissions and
 - * limitations under the License.
 - */
 -
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*    http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing,
 +* software distributed under the License is distributed on an
 +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +* KIND, either express or implied.  See the License for the
 +* specific language governing permissions and limitations
 +* under the License.
 +*/
  package org.apache.cassandra.service;
  
 +import java.net.InetAddress;
 +import java.util.*;
++import java.util.concurrent.ExecutionException;
  
 -import java.util.Collections;
 -import java.util.HashSet;
 -import java.util.Iterator;
 -import java.util.Set;
 -import java.util.UUID;
 -
 +import com.google.common.base.Predicate;
  import com.google.common.collect.Sets;
 -
 +import org.junit.Before;
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
@@@ -49,183 -42,13 +50,184 @@@ import org.apache.cassandra.utils.concu
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
  
 -public class ActiveRepairServiceTest extends SchemaLoader
 +public class ActiveRepairServiceTest
  {
 +    public static final String KEYSPACE5 = "Keyspace5";
 +    public static final String CF_STANDARD1 = "Standard1";
 +    public static final String CF_COUNTER = "Counter1";
 +
 +    public String cfname;
 +    public ColumnFamilyStore store;
 +    public InetAddress LOCAL, REMOTE;
 +
 +    private boolean initialized;
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE5,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(2),
 +                                    SchemaLoader.standardCFMD(KEYSPACE5, 
CF_COUNTER),
 +                                    SchemaLoader.standardCFMD(KEYSPACE5, 
CF_STANDARD1));
 +    }
 +
 +    @Before
 +    public void prepare() throws Exception
 +    {
 +        if (!initialized)
 +        {
 +            SchemaLoader.startGossiper();
 +            initialized = true;
 +
 +            LOCAL = FBUtilities.getBroadcastAddress();
 +            // generate a fake endpoint for which we can spoof 
receiving/sending trees
 +            REMOTE = InetAddress.getByName("127.0.0.2");
 +        }
 +
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +        tmd.clearUnsafe();
 +        
StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
 +        
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), 
REMOTE);
 +        assert tmd.isMember(REMOTE);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsPlusOne() throws Throwable
 +    {
 +        // generate rf+1 nodes, and ensure that all nodes are returned
 +        Set<InetAddress> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwo() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        Set<InetAddress> expected = new HashSet<>();
 +        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsPlusOneInLocalDC() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf+1 nodes, and ensure that all nodes are returned
 +        Set<InetAddress> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        // remove remote endpoints
 +        TokenMetadata.Topology topology = 
tmd.cloneOnlyTokenMap().getTopology();
 +        HashSet<InetAddress> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
 +        expected = Sets.intersection(expected, localEndpoints);
 +
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        Set<InetAddress> expected = new HashSet<>();
 +        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        // remove remote endpoints
 +        TokenMetadata.Topology topology = 
tmd.cloneOnlyTokenMap().getTopology();
 +        HashSet<InetAddress> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
 +        expected = Sets.intersection(expected, localEndpoints);
 +
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
  
 -    private static final String KEYSPACE1 = "Keyspace1";
 -    private static final String CF = "Standard1";
 +        // generate rf*2 nodes, and ensure that only neighbors specified by 
the hosts are returned
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        List<InetAddress> expected = new ArrayList<>();
 +        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<String> hosts = 
Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +
 +        assertEquals(expected.get(0), 
ActiveRepairService.getNeighbors(KEYSPACE5, ranges,
 +                                                                       
ranges.iterator().next(),
 +                                                                       null, 
hosts).iterator().next());
 +    }
 +
 +    @Test(expected = IllegalArgumentException.class)
 +    public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws 
Throwable
 +    {
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        //Dont give local endpoint
 +        Collection<String> hosts = Arrays.asList("127.0.0.3");
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        ActiveRepairService.getNeighbors(KEYSPACE5, ranges, 
ranges.iterator().next(), null, hosts);
 +    }
 +
 +    Set<InetAddress> addTokens(int max) throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +        Set<InetAddress> endpoints = new HashSet<>();
 +        for (int i = 1; i <= max; i++)
 +        {
 +            InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
 +            
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), 
endpoint);
 +            endpoints.add(endpoint);
 +        }
 +        return endpoints;
 +    }
  
      @Test
      public void testGetActiveRepairedSSTableRefs()
@@@ -234,14 -57,11 +236,12 @@@
          Set<SSTableReader> original = store.getUnrepairedSSTables();
  
          UUID prsId = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
 +        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, false);
          ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
- 
-         //add all sstables to parent repair session
-         prs.addSSTables(store.metadata.cfId, original);
+         prs.markSSTablesRepairing(store.metadata.cfId, prsId);
 +
          //retrieve all sstable references from parent repair sessions
-         Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
 -        Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId);
++        Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId);
          Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
          assertEquals(original, retrieved);
          refs.release();
@@@ -249,35 -69,70 +249,128 @@@
          //remove 1 sstable from data data tracker
          Set<SSTableReader> newLiveSet = new HashSet<>(original);
          Iterator<SSTableReader> it = newLiveSet.iterator();
 -        SSTableReader removed = it.next();
 +        final SSTableReader removed = it.next();
          it.remove();
 -        
store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), 
Collections.EMPTY_SET);
 +        store.getTracker().dropSSTables(new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader reader)
 +            {
 +                return removed.equals(reader);
 +            }
 +        }, OperationType.COMPACTION, null);
  
          //retrieve sstable references from parent repair session again - 
removed sstable must not be present
-         refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
 -        refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId);
++        refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId);
          retrieved = Sets.newHashSet(refs.iterator());
          assertEquals(newLiveSet, retrieved);
          assertFalse(retrieved.contains(removed));
          refs.release();
      }
  
+     @Test
+     public void testAddingMoreSSTables()
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         Set<SSTableReader> original = store.getUnrepairedSSTables();
+         UUID prsId = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
++        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, true);
+         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(prsId);
+         prs.markSSTablesRepairing(store.metadata.cfId, prsId);
 -        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId))
++        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+             assertEquals(original, retrieved);
+         }
+         createSSTables(store, 2);
+         boolean exception = false;
+         try
+         {
+             UUID newPrsId = UUID.randomUUID();
 -            
ActiveRepairService.instance.registerParentRepairSession(newPrsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
++            
ActiveRepairService.instance.registerParentRepairSession(newPrsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, 
true, true);
+             
ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId,
 newPrsId);
+         }
+         catch (Throwable t)
+         {
+             exception = true;
+         }
+         assertTrue(exception);
+ 
 -        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId))
++        try (Refs<SSTableReader> refs = 
prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+         {
+             Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+             assertEquals(original, retrieved);
+         }
+     }
+ 
++    @Test
++    public void testSnapshotAddSSTables() throws ExecutionException, 
InterruptedException
++    {
++        ColumnFamilyStore store = prepareColumnFamilyStore();
++        UUID prsId = UUID.randomUUID();
++        Set<SSTableReader> original = store.getUnrepairedSSTables();
++        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), 
store.partitioner.getMinimumToken())), true, true);
++        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
++
++        UUID prsId2 = UUID.randomUUID();
++        ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), 
store.partitioner.getMinimumToken())), true, true);
++        createSSTables(store, 2);
++        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
++        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
++        {
++            assertEquals(original, Sets.newHashSet(refs.iterator()));
++        }
++        store.forceMajorCompaction();
++        // after a major compaction the original sstables will be gone and we 
will have no sstables to anticompact:
++        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
++        {
++            assertEquals(0, refs.size());
++        }
++    }
++
++    @Test
++    public void testSnapshotMultipleRepairs()
++    {
++        ColumnFamilyStore store = prepareColumnFamilyStore();
++        Set<SSTableReader> original = store.getUnrepairedSSTables();
++        UUID prsId = UUID.randomUUID();
++        ActiveRepairService.instance.registerParentRepairSession(prsId, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), 
store.partitioner.getMinimumToken())), true, true);
++        
ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId,
 prsId);
++
++        UUID prsId2 = UUID.randomUUID();
++        ActiveRepairService.instance.registerParentRepairSession(prsId2, 
FBUtilities.getBroadcastAddress(), Collections.singletonList(store), 
Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), 
store.partitioner.getMinimumToken())), true, true);
++        boolean exception = false;
++        try
++        {
++            
ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId,
 prsId2);
++        }
++        catch (Throwable t)
++        {
++            exception = true;
++        }
++        assertTrue(exception);
++        try (Refs<SSTableReader> refs = 
ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId,
 prsId))
++        {
++            assertEquals(original, Sets.newHashSet(refs.iterator()));
++        }
++    }
++
      private ColumnFamilyStore prepareColumnFamilyStore()
      {
 -        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 -        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        Keyspace keyspace = Keyspace.open(KEYSPACE5);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1);
+         store.truncateBlocking();
          store.disableAutoCompaction();
+         createSSTables(store, 10);
+         return store;
+     }
+ 
+     private void createSSTables(ColumnFamilyStore cfs, int count)
+     {
          long timestamp = System.currentTimeMillis();
-         //create 10 sstables
-         for (int i = 0; i < 10; i++)
+         for (int i = 0; i < count; i++)
          {
              DecoratedKey key = Util.dk(Integer.toString(i));
 -            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
 +            Mutation rm = new Mutation(KEYSPACE5, key.getKey());
              for (int j = 0; j < 10; j++)
                  rm.add("Standard1", Util.cellname(Integer.toString(j)),
                         ByteBufferUtil.EMPTY_BYTE_BUFFER,

Reply via email to