Repository: cassandra
Updated Branches:
  refs/heads/trunk c6cd82462 -> 3cec208c4


Add incremental repair support for --hosts, --force, and subrange repair

Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13818


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cec208c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cec208c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cec208c

Branch: refs/heads/trunk
Commit: 3cec208c40b85e1be0ff8c68fc9d9017945a1ed8
Parents: c6cd824
Author: Blake Eggleston <[email protected]>
Authored: Mon Aug 28 10:33:34 2017 -0700
Committer: Blake Eggleston <[email protected]>
Committed: Tue Sep 12 15:51:34 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |   4 +-
 .../org/apache/cassandra/repair/RepairJob.java  |  10 +-
 .../repair/RepairMessageVerbHandler.java        |   6 +-
 .../apache/cassandra/repair/RepairRunnable.java | 161 ++++++++++++++-----
 .../apache/cassandra/repair/RepairSession.java  |  12 +-
 .../org/apache/cassandra/repair/Validator.java  |  10 +-
 .../repair/consistent/ConsistentSession.java    |   3 +-
 .../cassandra/repair/messages/RepairOption.java |  16 +-
 .../cassandra/service/ActiveRepairService.java  |  33 ++--
 ...pactionStrategyManagerPendingRepairTest.java |   2 +-
 .../cassandra/repair/AbstractRepairTest.java    |   2 +
 .../cassandra/repair/RepairRunnableTest.java    |  65 ++++++++
 .../repair/consistent/LocalSessionTest.java     |   1 -
 .../repair/messages/RepairOptionTest.java       |  13 --
 .../service/ActiveRepairServiceTest.java        |  55 +++++++
 16 files changed, 289 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1f03ec5..55bbfa8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add incremental repair support for --hosts, --force, and subrange repair 
(CASSANDRA-13818)
  * Rework CompactionStrategyManager.getScanners synchronization 
(CASSANDRA-13786)
  * Add additional unit tests for batch behavior, TTLs, Timestamps 
(CASSANDRA-13846)
  * Add keyspace and table name in schema validation exception (CASSANDRA-13845)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5619da7..06fbef2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1330,7 +1330,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             }
             else
             {
-                if (!validator.isConsistent)
+                if (!validator.isIncremental)
                 {
                     // flush first so everyone is validating data that is as 
similar as possible
                     
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
@@ -1447,7 +1447,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             predicate = prs.getPreviewPredicate();
 
         }
-        else if (validator.isConsistent)
+        else if (validator.isIncremental)
         {
             predicate = s -> 
validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 0615681..4bc3496 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -43,7 +43,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     private final RepairJobDesc desc;
     private final RepairParallelism parallelismDegree;
     private final ListeningExecutorService taskExecutor;
-    private final boolean isConsistent;
+    private final boolean isIncremental;
     private final PreviewKind previewKind;
 
     /**
@@ -52,13 +52,13 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
      * @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public RepairJob(RepairSession session, String columnFamily, boolean 
isConsistent, PreviewKind previewKind)
+    public RepairJob(RepairSession session, String columnFamily, boolean 
isIncremental, PreviewKind previewKind)
     {
         this.session = session;
         this.desc = new RepairJobDesc(session.parentRepairSession, 
session.getId(), session.keyspace, columnFamily, session.getRanges());
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
-        this.isConsistent = isConsistent;
+        this.isIncremental = isIncremental;
         this.previewKind = previewKind;
     }
 
@@ -81,7 +81,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         if (parallelismDegree != RepairParallelism.PARALLEL)
         {
             ListenableFuture<List<InetAddress>> allSnapshotTasks;
-            if (isConsistent)
+            if (isIncremental)
             {
                 // consistent repair does it's own "snapshotting"
                 allSnapshotTasks = Futures.immediateFuture(allEndpoints);
@@ -135,7 +135,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
                         SyncTask task;
                         if (r1.endpoint.equals(local) || 
r2.endpoint.equals(local))
                         {
-                            task = new LocalSyncTask(desc, r1, r2, 
isConsistent ? desc.parentSessionId : null, session.pullRepair, 
session.previewKind);
+                            task = new LocalSyncTask(desc, r1, r2, 
isIncremental ? desc.parentSessionId : null, session.pullRepair, 
session.previewKind);
                         }
                         else
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c38d098..3c7f890 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -46,7 +46,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RepairMessageVerbHandler.class);
 
-    private boolean isConsistent(UUID sessionID)
+    private boolean isIncremental(UUID sessionID)
     {
         return 
ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID);
     }
@@ -136,7 +136,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
 
                     
ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
                     Validator validator = new Validator(desc, message.from, 
validationRequest.nowInSec,
-                                                        
isConsistent(desc.parentSessionId), previewKind(desc.parentSessionId));
+                                                        
isIncremental(desc.parentSessionId), previewKind(desc.parentSessionId));
                     CompactionManager.instance.submitValidation(store, 
validator);
                     break;
 
@@ -144,7 +144,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     // forwarded sync request
                     SyncRequest request = (SyncRequest) message.payload;
                     logger.debug("Syncing {}", request);
-                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null, 
request.previewKind);
+                    StreamingRepairTask task = new StreamingRepairTask(desc, 
request, isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, 
request.previewKind);
                     task.run();
                     break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index b581ebd..9e37ada 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -27,18 +27,26 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.*;
 import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.junit.internal.runners.statements.Fail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.repair.consistent.SyncStatSummary;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.Keyspace;
@@ -130,6 +138,47 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         recordFailure(message, completionMessage);
     }
 
+    @VisibleForTesting
+    static class CommonRange
+    {
+        public final Set<InetAddress> endpoints;
+        public final Collection<Range<Token>> ranges;
+
+        public CommonRange(Set<InetAddress> endpoints, 
Collection<Range<Token>> ranges)
+        {
+            Preconditions.checkArgument(endpoints != null && 
!endpoints.isEmpty());
+            Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
+            this.endpoints = endpoints;
+            this.ranges = ranges;
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            CommonRange that = (CommonRange) o;
+
+            if (!endpoints.equals(that.endpoints)) return false;
+            return ranges.equals(that.ranges);
+        }
+
+        public int hashCode()
+        {
+            int result = endpoints.hashCode();
+            result = 31 * result + ranges.hashCode();
+            return result;
+        }
+
+        public String toString()
+        {
+            return "CommonRange{" +
+                   "endpoints=" + endpoints +
+                   ", ranges=" + ranges +
+                   '}';
+        }
+    }
+
     protected void runMayThrow() throws Exception
     {
         ActiveRepairService.instance.recordRepairStatus(cmd, 
ActiveRepairService.ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
@@ -184,7 +233,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         }
 
         final Set<InetAddress> allNeighbors = new HashSet<>();
-        List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> 
commonRanges = new ArrayList<>();
+        List<CommonRange> commonRanges = new ArrayList<>();
 
         //pre-calculate output of getLocalRanges and pass it to getNeighbors 
to increase performance and prevent
         //calculation multiple times
@@ -235,11 +284,9 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
             SystemDistributedKeyspace.startParentRepair(parentSession, 
keyspace, cfnames, options);
         }
 
-        long repairedAt;
         try (Timer.Context ctx = 
Keyspace.open(keyspace).metric.repairPrepareTime.time())
         {
             ActiveRepairService.instance.prepareForRepair(parentSession, 
FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
-            repairedAt = 
ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
             progress.incrementAndGet();
         }
         catch (Throwable t)
@@ -254,23 +301,22 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
 
         if (options.isPreview())
         {
-            previewRepair(parentSession, repairedAt, startTime, traceState, 
allNeighbors, commonRanges, cfnames);
+            previewRepair(parentSession, startTime, commonRanges, cfnames);
         }
         else if (options.isIncremental())
         {
-            consistentRepair(parentSession, repairedAt, startTime, traceState, 
allNeighbors, commonRanges, cfnames);
+            incrementalRepair(parentSession, startTime, 
options.isForcedRepair(), traceState, allNeighbors, commonRanges, cfnames);
         }
         else
         {
-            normalRepair(parentSession, startTime, traceState, allNeighbors, 
commonRanges, cfnames);
+            normalRepair(parentSession, startTime, traceState, commonRanges, 
cfnames);
         }
     }
 
     private void normalRepair(UUID parentSession,
                               long startTime,
                               TraceState traceState,
-                              Set<InetAddress> allNeighbors,
-                              List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> commonRanges,
+                              List<CommonRange> commonRanges,
                               String... cfnames)
     {
 
@@ -295,15 +341,11 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                     logger.debug("Repair result: {}", results);
                     if (sessionResult != null)
                     {
-                        // don't promote sstables for sessions we skipped 
replicas for
+                        // don't record successful repair if we had to skip 
ranges
                         if (!sessionResult.skippedReplicas)
                         {
                             successfulRanges.addAll(sessionResult.ranges);
                         }
-                        else
-                        {
-                            logger.debug("Skipping anticompaction for {}", 
results);
-                        }
                     }
                     else
                     {
@@ -316,26 +358,59 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, 
hasFailure, executor));
     }
 
-    private void consistentRepair(UUID parentSession,
-                                  long repairedAt,
-                                  long startTime,
-                                  TraceState traceState,
-                                  Set<InetAddress> allNeighbors,
-                                  List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> commonRanges,
-                                  String... cfnames)
+    /**
+     * removes dead nodes from common ranges, and exludes ranges left without 
any participants
+     */
+    @VisibleForTesting
+    static List<CommonRange> filterCommonRanges(List<CommonRange> 
commonRanges, Set<InetAddress> liveEndpoints, boolean force)
     {
-        // the local node also needs to be included in the set of
-        // participants, since coordinator sessions aren't persisted
-        Set<InetAddress> allParticipants = new HashSet<>(allNeighbors);
-        allParticipants.add(FBUtilities.getBroadcastAddress());
+        if (!force)
+        {
+            return commonRanges;
+        }
+        else
+        {
+            List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
+
+            for (CommonRange commonRange: commonRanges)
+            {
+                Set<InetAddress> endpoints = 
ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, 
liveEndpoints::contains));
+
+                // this node is implicitly a participant in this repair, so a 
single endpoint is ok here
+                if (!endpoints.isEmpty())
+                {
+                    filtered.add(new CommonRange(endpoints, 
commonRange.ranges));
+                }
+            }
+            Preconditions.checkState(!filtered.isEmpty(), "Not enough live 
endpoints for a repair");
+            return filtered;
+        }
+    }
+
+    private void incrementalRepair(UUID parentSession,
+                                   long startTime,
+                                   boolean forceRepair,
+                                   TraceState traceState,
+                                   Set<InetAddress> allNeighbors,
+                                   List<CommonRange> commonRanges,
+                                   String... cfnames)
+    {
+        // the local node also needs to be included in the set of 
participants, since coordinator sessions aren't persisted
+        Predicate<InetAddress> isAlive = FailureDetector.instance::isAlive;
+        Set<InetAddress> allParticipants = ImmutableSet.<InetAddress>builder()
+                                           .addAll(forceRepair ? 
Iterables.filter(allNeighbors, isAlive) : allNeighbors)
+                                           
.add(FBUtilities.getBroadcastAddress())
+                                           .build();
+
+        List<CommonRange> allRanges = filterCommonRanges(commonRanges, 
allParticipants, forceRepair);
 
         CoordinatorSession coordinatorSession = 
ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession,
 allParticipants);
         ListeningExecutorService executor = createExecutor();
         AtomicBoolean hasFailure = new AtomicBoolean(false);
-        ListenableFuture repairResult = coordinatorSession.execute(() -> 
submitRepairSessions(parentSession, true, executor, commonRanges, cfnames),
+        ListenableFuture repairResult = coordinatorSession.execute(() -> 
submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
                                                                    hasFailure);
         Collection<Range<Token>> ranges = new HashSet<>();
-        for (Collection<Range<Token>> range : 
Iterables.transform(commonRanges, cr -> cr.right))
+        for (Collection<Range<Token>> range : Iterables.transform(allRanges, 
cr -> cr.ranges))
         {
             ranges.addAll(range);
         }
@@ -343,11 +418,8 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
     }
 
     private void previewRepair(UUID parentSession,
-                               long repairedAt,
                                long startTime,
-                               TraceState traceState,
-                               Set<InetAddress> allNeighbors,
-                               List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> commonRanges,
+                               List<CommonRange> commonRanges,
                                String... cfnames)
     {
 
@@ -421,22 +493,27 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
     }
 
     private ListenableFuture<List<RepairSessionResult>> 
submitRepairSessions(UUID parentSession,
-                                                                             
boolean isConsistent,
+                                                                             
boolean isIncremental,
                                                                              
ListeningExecutorService executor,
-                                                                             
List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges,
+                                                                             
List<CommonRange> commonRanges,
                                                                              
String... cfnames)
     {
         List<ListenableFuture<RepairSessionResult>> futures = new 
ArrayList<>(options.getRanges().size());
-        for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : 
commonRanges)
+
+        // we do endpoint filtering at the start of an incremental repair,
+        // so repair sessions shouldn't also be checking liveness
+        boolean force = options.isForcedRepair() && !isIncremental;
+        for (CommonRange cr : commonRanges)
         {
+            logger.info("Starting RepairSession for {}", cr);
             RepairSession session = 
ActiveRepairService.instance.submitRepairSession(parentSession,
-                                                                               
      p.right,
+                                                                               
      cr.ranges,
                                                                                
      keyspace,
                                                                                
      options.getParallelism(),
-                                                                               
      p.left,
-                                                                               
      isConsistent,
+                                                                               
      cr.endpoints,
+                                                                               
      isIncremental,
                                                                                
      options.isPullRepair(),
-                                                                               
      options.isForcedRepair(),
+                                                                               
      force,
                                                                                
      options.getPreviewKind(),
                                                                                
      executor,
                                                                                
      cfnames);
@@ -595,22 +672,22 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
                                                
ImmutableList.of(failureMessage, completionMessage));
     }
 
-    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends 
Collection<Range<Token>>>> neighborRangeList, Range<Token> range, 
Set<InetAddress> neighbors)
+    private void addRangeToNeighbors(List<CommonRange> neighborRangeList, 
Range<Token> range, Set<InetAddress> neighbors)
     {
         for (int i = 0; i < neighborRangeList.size(); i++)
         {
-            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = 
neighborRangeList.get(i);
+            CommonRange cr = neighborRangeList.get(i);
 
-            if (p.left.containsAll(neighbors))
+            if (cr.endpoints.containsAll(neighbors))
             {
-                p.right.add(range);
+                cr.ranges.add(range);
                 return;
             }
         }
 
         List<Range<Token>> ranges = new ArrayList<>();
         ranges.add(range);
-        neighborRangeList.add(Pair.create(neighbors, ranges));
+        neighborRangeList.add(new CommonRange(neighbors, ranges));
     }
 
     private Thread createQueryThread(final int cmd, final UUID sessionId)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index d00e1b2..5dbd050 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -97,7 +96,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     /** Range to repair */
     public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
-    public final boolean isConsistent;
+    public final boolean isIncremental;
     public final PreviewKind previewKind;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
@@ -131,7 +130,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
                          String keyspace,
                          RepairParallelism parallelismDegree,
                          Set<InetAddress> endpoints,
-                         boolean isConsistent,
+                         boolean isIncremental,
                          boolean pullRepair,
                          boolean force,
                          PreviewKind previewKind,
@@ -162,7 +161,8 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
             }
             if (!removeCandidates.isEmpty())
             {
-                // we shouldn't be promoting sstables to repaired if any 
replicas are excluded from the repair
+                // we shouldn't be recording a successful repair if
+                // any replicas are excluded from the repair
                 forceSkippedReplicas = true;
                 endpoints = new HashSet<>(endpoints);
                 endpoints.removeAll(removeCandidates);
@@ -170,7 +170,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         }
 
         this.endpoints = endpoints;
-        this.isConsistent = isConsistent;
+        this.isIncremental = isIncremental;
         this.previewKind = previewKind;
         this.pullRepair = pullRepair;
         this.skippedReplicas = forceSkippedReplicas;
@@ -301,7 +301,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         List<ListenableFuture<RepairResult>> jobs = new 
ArrayList<>(cfnames.length);
         for (String cfname : cfnames)
         {
-            RepairJob job = new RepairJob(this, cfname, isConsistent, 
previewKind);
+            RepairJob job = new RepairJob(this, cfname, isIncremental, 
previewKind);
             executor.execute(job);
             jobs.add(job);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java 
b/src/java/org/apache/cassandra/repair/Validator.java
index bdf8cca..f9556d6 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -60,7 +60,7 @@ public class Validator implements Runnable
     public final InetAddress initiator;
     public final int nowInSec;
     private final boolean evenTreeDistribution;
-    public final boolean isConsistent;
+    public final boolean isIncremental;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -79,17 +79,17 @@ public class Validator implements Runnable
         this(desc, initiator, nowInSec, false, false, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, 
boolean isConsistent, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, 
boolean isIncremental, PreviewKind previewKind)
     {
-        this(desc, initiator, nowInSec, false, isConsistent, previewKind);
+        this(desc, initiator, nowInSec, false, isIncremental, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, 
boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, 
boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind)
     {
         this.desc = desc;
         this.initiator = initiator;
         this.nowInSec = nowInSec;
-        this.isConsistent = isConsistent;
+        this.isIncremental = isIncremental;
         this.previewKind = previewKind;
         validated = 0;
         range = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java 
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index 803a1f8..c137346 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -313,7 +313,8 @@ public abstract class ConsistentSession
             Preconditions.checkArgument(coordinator != null);
             Preconditions.checkArgument(ids != null);
             Preconditions.checkArgument(!ids.isEmpty());
-            Preconditions.checkArgument(repairedAt > 0);
+            Preconditions.checkArgument(repairedAt > 0
+                                        || repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE);
             Preconditions.checkArgument(ranges != null);
             Preconditions.checkArgument(!ranges.isEmpty());
             Preconditions.checkArgument(participants != null);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java 
b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index a95ee19..971bf5d 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -163,10 +163,6 @@ public class RepairOption
         Set<Range<Token>> ranges = new HashSet<>();
         if (rangesStr != null)
         {
-            if (incremental)
-                logger.warn("Incremental repair can't be requested with 
subrange repair " +
-                            "because each subrange repair would generate an 
anti-compacted table. " +
-                            "The repair will occur but without 
anti-compaction.");
             StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
             while (tokenizer.hasMoreTokens())
             {
@@ -251,16 +247,6 @@ public class RepairOption
             }
         }
 
-        if (option.isIncremental() && !option.isPreview() && 
!option.isGlobal())
-        {
-            throw new IllegalArgumentException("Incremental repairs cannot be 
run against a subset of tokens or ranges");
-        }
-
-        if (option.isIncremental() && option.isForcedRepair())
-        {
-            throw new IllegalArgumentException("Cannot force incremental 
repair");
-        }
-
         return option;
     }
 
@@ -359,7 +345,7 @@ public class RepairOption
 
     public boolean isGlobal()
     {
-        return dataCenters.isEmpty() && hosts.isEmpty() && !isSubrangeRepair();
+        return dataCenters.isEmpty() && hosts.isEmpty();
     }
 
     public boolean isSubrangeRepair()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2e02f0c..ab92822 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -209,7 +209,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                              String keyspace,
                                              RepairParallelism 
parallelismDegree,
                                              Set<InetAddress> endpoints,
-                                             boolean isConsistent,
+                                             boolean isIncremental,
                                              boolean pullRepair,
                                              boolean force,
                                              PreviewKind previewKind,
@@ -222,7 +222,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, 
isConsistent, pullRepair, force, previewKind, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, 
isIncremental, pullRepair, force, previewKind, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners
@@ -372,10 +372,28 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         return neighbors;
     }
 
+    /**
+     * we only want to set repairedAt for incremental repairs including all 
replicas for a token range. For non-global
+     * incremental repairs, forced incremental repairs, and full repairs, the 
UNREPAIRED_SSTABLE value will prevent
+     * sstables from being promoted to repaired or preserve the 
repairedAt/pendingRepair values, respectively.
+     */
+    static long getRepairedAt(RepairOption options)
+    {
+        // we only want to set repairedAt for incremental repairs including 
all replicas for a token range. For non-global incremental repairs, forced 
incremental repairs, and
+        // full repairs, the UNREPAIRED_SSTABLE value will prevent sstables 
from being promoted to repaired or preserve the repairedAt/pendingRepair 
values, respectively.
+        if (options.isIncremental() && options.isGlobal() && 
!options.isForcedRepair())
+        {
+            return Clock.instance.currentTimeMillis();
+        }
+        else
+        {
+            return  ActiveRepairService.UNREPAIRED_SSTABLE;
+        }
+    }
+
     public UUID prepareForRepair(UUID parentRepairSession, InetAddress 
coordinator, Set<InetAddress> endpoints, RepairOption options, 
List<ColumnFamilyStore> columnFamilyStores)
     {
-        // we only want repairedAt for incremental repairs, for non 
incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed 
sstables
-        long repairedAt = options.isIncremental() ? 
Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE;
+        long repairedAt = getRepairedAt(options);
         registerParentRepairSession(parentRepairSession, coordinator, 
columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, 
options.isGlobal(), options.getPreviewKind());
         final CountDownLatch prepareLatch = new 
CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
@@ -583,13 +601,6 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
             }
         }
 
-        public long getRepairedAt()
-        {
-            if (isGlobal)
-                return repairedAt;
-            return ActiveRepairService.UNREPAIRED_SSTABLE;
-        }
-
         public Collection<ColumnFamilyStore> getColumnFamilyStores()
         {
             return 
ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index af629e5..c7f1ae8 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -248,7 +248,7 @@ public class CompactionStrategyManagerPendingRepairTest 
extends AbstractPendingR
         csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
 
         // sstable should have pendingRepair cleared, and repairedAt set 
correctly
-        long expectedRepairedAt = 
ActiveRepairService.instance.getParentRepairSession(repairID).getRepairedAt();
+        long expectedRepairedAt = 
ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt;
         Assert.assertFalse(sstable.isPendingRepair());
         Assert.assertTrue(sstable.isRepaired());
         Assert.assertEquals(expectedRepairedAt, 
sstable.getSSTableMetadata().repairedAt);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java 
b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index d61d859..21c51c6 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -75,6 +75,8 @@ public abstract class AbstractRepairTest
     protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3));
     protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5));
 
+    protected static final Set<Range<Token>> ALL_RANGES = 
ImmutableSet.of(RANGE1, RANGE2, RANGE3);
+
     protected static UUID registerSession(ColumnFamilyStore cfs, boolean 
isIncremental, boolean isGlobal)
     {
         UUID sessionId = UUIDGen.getTimeUUID();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java 
b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
new file mode 100644
index 0000000..db76f73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.repair.RepairRunnable.CommonRange;
+
+import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
+
+public class RepairRunnableTest extends AbstractRepairTest
+{
+    /**
+     * For non-forced repairs, common ranges should be passed through as-is
+     */
+    @Test
+    public void filterCommonIncrementalRangesNotForced() throws Exception
+    {
+        CommonRange cr = new CommonRange(PARTICIPANTS, ALL_RANGES);
+
+        List<CommonRange> expected = Lists.newArrayList(cr);
+        List<CommonRange> actual = filterCommonRanges(expected, 
Collections.emptySet(), false);
+
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void forceFilterCommonIncrementalRanges() throws Exception
+    {
+        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, 
PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2));
+        CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, 
PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3));
+        Set<InetAddress> liveEndpoints = Sets.newHashSet(PARTICIPANT2, 
PARTICIPANT3); // PARTICIPANT1 is excluded
+
+        List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
+        List<CommonRange> expected = Lists.newArrayList(new 
CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)),
+                                                        new 
CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), 
Sets.newHashSet(RANGE3)));
+        List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, 
true);
+
+        Assert.assertEquals(expected, actual);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index be048fb..6e6d222 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -223,7 +223,6 @@ public class LocalSessionTest extends AbstractRepairTest
         assertValidationFailure(b -> b.withCoordinator(null));
         assertValidationFailure(b -> b.withTableIds(null));
         assertValidationFailure(b -> b.withTableIds(new HashSet<>()));
-        assertValidationFailure(b -> b.withRepairedAt(0));
         assertValidationFailure(b -> b.withRepairedAt(-1));
         assertValidationFailure(b -> b.withRanges(null));
         assertValidationFailure(b -> b.withRanges(new HashSet<>()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java 
b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 13d7575..484d7a8 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -150,19 +150,6 @@ public class RepairOptionTest
     }
 
     @Test
-    public void testNonGlobalIncrementalRepairParse() throws Exception
-    {
-        Map<String, String> options = new HashMap<>();
-        options.put(RepairOption.PARALLELISM_KEY, "parallel");
-        options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
-        options.put(RepairOption.INCREMENTAL_KEY, "true");
-        options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
-        options.put(RepairOption.HOSTS_KEY, "127.0.0.1, 127.0.0.2");
-        assertParseThrowsIllegalArgumentExceptionWithMessage(options, 
"Incremental repairs cannot be run against a subset of tokens or ranges");
-
-    }
-
-    @Test
     public void testForceOption() throws Exception
     {
         RepairOption option;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java 
b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 57ffa7d..cbacaec 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.*;
 
+import javax.xml.crypto.Data;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,17 +37,26 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static 
org.apache.cassandra.repair.messages.RepairOption.DATACENTERS_KEY;
+import static 
org.apache.cassandra.repair.messages.RepairOption.FORCE_REPAIR_KEY;
+import static org.apache.cassandra.repair.messages.RepairOption.HOSTS_KEY;
+import static 
org.apache.cassandra.repair.messages.RepairOption.INCREMENTAL_KEY;
+import static org.apache.cassandra.repair.messages.RepairOption.RANGES_KEY;
+import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+import static org.apache.cassandra.service.ActiveRepairService.getRepairedAt;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -299,4 +311,47 @@ public class ActiveRepairServiceTest
             cfs.forceBlockingFlush();
         }
     }
+
+    private static RepairOption opts(String... params)
+    {
+        assert params.length % 2 == 0 : "unbalanced key value pairs";
+        Map<String, String> opt = new HashMap<>();
+        for (int i=0; i<(params.length >> 1); i++)
+        {
+            int idx = i << 1;
+            opt.put(params[idx], params[idx+1]);
+        }
+        return RepairOption.parse(opt, DatabaseDescriptor.getPartitioner());
+    }
+
+    private static String b2s(boolean b)
+    {
+        return Boolean.toString(b);
+    }
+
+    /**
+     * Tests the expected repairedAt value is returned, based on different 
RepairOption
+     */
+    @Test
+    public void repairedAt() throws Exception
+    {
+        // regular incremental repair
+        Assert.assertNotEquals(UNREPAIRED_SSTABLE, 
getRepairedAt(opts(INCREMENTAL_KEY, b2s(true))));
+        // subrange incremental repair
+        Assert.assertNotEquals(UNREPAIRED_SSTABLE, 
getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                      
RANGES_KEY, "1:2")));
+
+        // hosts incremental repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, 
getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                   HOSTS_KEY, 
"127.0.0.1")));
+        // dc incremental repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, 
getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                   
DATACENTERS_KEY, "DC2")));
+        // forced incremental repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, 
getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                   
FORCE_REPAIR_KEY, b2s(true))));
+
+        // full repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, 
getRepairedAt(opts(INCREMENTAL_KEY, b2s(false))));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to