Repository: cassandra
Updated Branches:
  refs/heads/trunk 9274197b4 -> e60a06cc8


Mark sstables as repaired after full repair

Patch by marcuse; reviewed by yukim for CASSANDRA-7586


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e60a06cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e60a06cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e60a06cc

Branch: refs/heads/trunk
Commit: e60a06cc866e5e85d3e58f25b98f8c048d07ad24
Parents: 9274197
Author: Marcus Eriksson <[email protected]>
Authored: Tue Oct 28 16:30:50 2014 +0100
Committer: Marcus Eriksson <[email protected]>
Committed: Mon Nov 3 08:28:39 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 13 +++--
 .../db/compaction/CompactionManager.java        | 24 ++++++--
 .../repair/RepairMessageVerbHandler.java        | 23 +++++---
 .../repair/messages/AnticompactionRequest.java  |  8 +++
 .../repair/messages/PrepareMessage.java         | 10 +++-
 .../cassandra/repair/messages/RepairOption.java |  7 ---
 .../cassandra/repair/messages/SyncRequest.java  | 11 ++++
 .../repair/messages/ValidationRequest.java      |  8 +++
 .../cassandra/service/ActiveRepairService.java  | 61 +++++++++++---------
 .../cassandra/service/StorageService.java       | 44 +++++---------
 .../LeveledCompactionStrategyTest.java          |  2 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |  2 +-
 13 files changed, 127 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db3b091..3a8ada2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Mark sstables as repaired after full repair (CASSANDRA-7586) 
  * Extend Descriptor to include a format value and refactor reader/writer apis 
(CASSANDRA-7443)
  * Integrate JMH for microbenchmarks (CASSANDRA-8151)
  * Keep sstable levels when bootstrapping (CASSANDRA-7460)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0e3131c..2a61b39 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2151,8 +2151,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         snapshotWithoutFlush(snapshotName, null);
     }
 
-    public void snapshotWithoutFlush(String snapshotName, 
Predicate<SSTableReader> predicate)
+    public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, 
Predicate<SSTableReader> predicate)
     {
+        Set<SSTableReader> snapshottedSSTables = new HashSet<>();
         for (ColumnFamilyStore cfs : concatWithIndexes())
         {
             DataTracker.View currentView = cfs.markCurrentViewReferenced();
@@ -2171,6 +2172,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                     
filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA));
                     if (logger.isDebugEnabled())
                         logger.debug("Snapshot for {} keyspace data file {} 
created in {}", keyspace, ssTable.getFilename(), snapshotDirectory);
+                    snapshottedSSTables.add(ssTable);
                 }
 
                 writeSnapshotManifest(filesJSONArr, snapshotName);
@@ -2180,6 +2182,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 SSTableReader.releaseReferences(currentView.sstables);
             }
         }
+        return snapshottedSSTables;
     }
 
     private void writeSnapshotManifest(final JSONArray filesJSONArr, final 
String snapshotName)
@@ -2216,15 +2219,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      *
      * @param snapshotName the name of the associated with the snapshot
      */
-    public void snapshot(String snapshotName)
+    public Set<SSTableReader> snapshot(String snapshotName)
     {
-        snapshot(snapshotName, null);
+        return snapshot(snapshotName, null);
     }
 
-    public void snapshot(String snapshotName, Predicate<SSTableReader> 
predicate)
+    public Set<SSTableReader> snapshot(String snapshotName, 
Predicate<SSTableReader> predicate)
     {
         forceBlockingFlush();
-        snapshotWithoutFlush(snapshotName, predicate);
+        return snapshotWithoutFlush(snapshotName, predicate);
     }
 
     public boolean snapshotExists(String snapshotName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 18ad7ae..3ee36cd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -903,8 +903,11 @@ public class CompactionManager implements 
CompactionManagerMBean
             if (isSnapshotValidation)
             {
                 // If there is a snapshot created for the session then read 
from there.
+                // note that we populate the parent repair session when 
creating the snapshot, meaning the sstables in the snapshot are the ones we
+                // are supposed to validate.
                 sstables = cfs.getSnapshotSSTableReader(snapshotName);
 
+
                 // Computing gcbefore based on the current time wouldn't be 
very good because we know each replica will execute
                 // this at a different time (that's the whole purpose of 
repair with snaphsot). So instead we take the creation
                 // time of the snapshot, which should give us roughtly the 
same time on each replica (roughtly being in that case
@@ -915,12 +918,21 @@ public class CompactionManager implements 
CompactionManagerMBean
             {
                 // flush first so everyone is validating data that is as 
similar as possible
                 
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-                // we don't mark validating sstables as compacting in 
DataTracker, so we have to mark them referenced
-                // instead so they won't be cleaned up if they do get 
compacted during the validation
-                if (validator.desc.parentSessionId == null || 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId)
 == null)
-                    sstables = cfs.markCurrentSSTablesReferenced();
-                else
-                    sstables = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
+                ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+                Set<SSTableReader> sstablesToValidate = new HashSet<>();
+                for (SSTableReader sstable : cfs.getSSTables())
+                {
+                    if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Arrays.asList(validator.desc.range)))
+                    {
+                        if (!prs.isIncremental || !sstable.isRepaired())
+                        {
+                            sstablesToValidate.add(sstable);
+                        }
+                    }
+                }
+                prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+
+                sstables = 
prs.getAndReferenceSSTablesInRange(cfs.metadata.cfId, validator.desc.range);
 
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 2ad8dc2..f9180c2 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -18,12 +18,18 @@
 package org.apache.cassandra.repair;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Predicate;
+
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,10 +38,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -61,6 +64,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
         {
             case PREPARE_MESSAGE:
                 PrepareMessage prepareMessage = (PrepareMessage) 
message.payload;
+                logger.debug("Preparing, {}", prepareMessage);
                 List<ColumnFamilyStore> columnFamilyStores = new 
ArrayList<>(prepareMessage.cfIds.size());
                 for (UUID cfId : prepareMessage.cfIds)
                 {
@@ -70,14 +74,16 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                 }
                 
ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
                                                                          
columnFamilyStores,
-                                                                         
prepareMessage.ranges);
+                                                                         
prepareMessage.ranges,
+                                                                         
prepareMessage.isIncremental);
                 MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                 break;
 
             case SNAPSHOT:
+                logger.debug("Snapshotting {}", desc);
                 ColumnFamilyStore cfs = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
                 final Range<Token> repairingRange = desc.range;
-                cfs.snapshot(desc.sessionId.toString(), new 
Predicate<SSTableReader>()
+                Set<SSTableReader> snapshottedSSSTables = 
cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
                 {
                     public boolean apply(SSTableReader sstable)
                     {
@@ -86,13 +92,14 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                                new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
                     }
                 });
-
+                
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 snapshottedSSSTables);
                 logger.debug("Enqueuing response to snapshot request {} to 
{}", desc.sessionId, message.from);
                 MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                 break;
 
             case VALIDATION_REQUEST:
                 ValidationRequest validationRequest = (ValidationRequest) 
message.payload;
+                logger.debug("Validating {}", validationRequest);
                 // trigger read-only compaction
                 ColumnFamilyStore store = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
 
@@ -103,7 +110,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
             case SYNC_REQUEST:
                 // forwarded sync request
                 SyncRequest request = (SyncRequest) message.payload;
-
+                logger.debug("Syncing {}", request);
                 long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
                 if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
                     repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
@@ -113,8 +120,8 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                 break;
 
             case ANTICOMPACTION_REQUEST:
-                logger.debug("Got anticompaction request");
                 AnticompactionRequest anticompactionRequest = 
(AnticompactionRequest) message.payload;
+                logger.debug("Got anticompaction request {}", 
anticompactionRequest);
                 try
                 {
                     List<Future<?>> futures = 
ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java 
b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index 34ea5a5..1a13ad1 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -53,4 +53,12 @@ public class AnticompactionRequest extends RepairMessage
             return 
UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "AnticompactionRequest{" +
+                "parentRepairSession=" + parentRepairSession +
+                "} " + super.toString();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 5699677..035ccc4 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -38,13 +38,15 @@ public class PrepareMessage extends RepairMessage
     public final Collection<Range<Token>> ranges;
 
     public final UUID parentRepairSession;
+    public final boolean isIncremental;
 
-    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, 
Collection<Range<Token>> ranges)
+    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, 
Collection<Range<Token>> ranges, boolean isIncremental)
     {
         super(Type.PREPARE_MESSAGE, null);
         this.parentRepairSession = parentRepairSession;
         this.cfIds = cfIds;
         this.ranges = ranges;
+        this.isIncremental = isIncremental;
     }
 
     public static class PrepareMessageSerializer implements 
MessageSerializer<PrepareMessage>
@@ -58,6 +60,7 @@ public class PrepareMessage extends RepairMessage
             out.writeInt(message.ranges.size());
             for (Range r : message.ranges)
                 Range.serializer.serialize(r, out, version);
+            out.writeBoolean(message.isIncremental);
         }
 
         public PrepareMessage deserialize(DataInput in, int version) throws 
IOException
@@ -71,7 +74,8 @@ public class PrepareMessage extends RepairMessage
             List<Range<Token>> ranges = new ArrayList<>(rangeCount);
             for (int i = 0; i < rangeCount; i++)
                 ranges.add((Range<Token>) Range.serializer.deserialize(in, 
version).toTokenBounds());
-            return new PrepareMessage(parentRepairSession, cfIds, ranges);
+            boolean isIncremental = in.readBoolean();
+            return new PrepareMessage(parentRepairSession, cfIds, ranges, 
isIncremental);
         }
 
         public long serializedSize(PrepareMessage message, int version)
@@ -85,6 +89,7 @@ public class PrepareMessage extends RepairMessage
             size += sizes.sizeof(message.ranges.size());
             for (Range r : message.ranges)
                 size += Range.serializer.serializedSize(r, version);
+            size += sizes.sizeof(message.isIncremental);
             return size;
         }
     }
@@ -96,6 +101,7 @@ public class PrepareMessage extends RepairMessage
                 "cfIds='" + cfIds + '\'' +
                 ", ranges=" + ranges +
                 ", parentRepairSession=" + parentRepairSession +
+                ", isIncremental="+isIncremental +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java 
b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index ca02365..63446e5 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -211,13 +211,6 @@ public class RepairOption
 
     public RepairOption(boolean sequential, boolean primaryRange, boolean 
incremental, int jobThreads, Collection<Range<Token>> ranges)
     {
-        if (sequential && incremental)
-        {
-            String message = "It is not possible to mix sequential repair and 
incremental repairs.";
-            logger.error(message);
-            throw new IllegalArgumentException(message);
-        }
-
         if (!FBUtilities.isUnix() && sequential)
         {
             logger.warn("Snapshot-based repair is not yet supported on 
Windows.  Reverting to parallel repair.");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java 
b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index e677cd8..c4d0ab6 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -92,4 +92,15 @@ public class SyncRequest extends RepairMessage
             return size;
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "SyncRequest{" +
+                "initiator=" + initiator +
+                ", src=" + src +
+                ", dst=" + dst +
+                ", ranges=" + ranges +
+                "} " + super.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java 
b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
index c73b708..43bcf23 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -42,6 +42,14 @@ public class ValidationRequest extends RepairMessage
     }
 
     @Override
+    public String toString()
+    {
+        return "ValidationRequest{" +
+                "gcBefore=" + gcBefore +
+                "} " + super.toString();
+    }
+
+    @Override
     public boolean equals(Object o)
     {
         if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index fa354e6..08cef5c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -228,10 +228,10 @@ public class ActiveRepairService
         return neighbors;
     }
 
-    public UUID prepareForRepair(Set<InetAddress> endpoints, 
Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
+    public UUID prepareForRepair(Set<InetAddress> endpoints, RepairOption 
options, List<ColumnFamilyStore> columnFamilyStores)
     {
         UUID parentRepairSession = UUIDGen.getTimeUUID();
-        registerParentRepairSession(parentRepairSession, columnFamilyStores, 
ranges);
+        registerParentRepairSession(parentRepairSession, columnFamilyStores, 
options.getRanges(), options.isIncremental());
         final CountDownLatch prepareLatch = new 
CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
         IAsyncCallbackWithFailure callback = new IAsyncCallbackWithFailure()
@@ -259,7 +259,7 @@ public class ActiveRepairService
 
         for(InetAddress neighbour : endpoints)
         {
-            PrepareMessage message = new PrepareMessage(parentRepairSession, 
cfIds, ranges);
+            PrepareMessage message = new PrepareMessage(parentRepairSession, 
cfIds, options.getRanges(), options.isIncremental());
             MessageOut<RepairMessage> msg = message.createMessage();
             MessagingService.instance().sendRRWithFailure(msg, neighbour, 
callback);
         }
@@ -282,25 +282,9 @@ public class ActiveRepairService
         return parentRepairSession;
     }
 
-    public void registerParentRepairSession(UUID parentRepairSession, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
+    public void registerParentRepairSession(UUID parentRepairSession, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental)
     {
-        Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>();
-        for (ColumnFamilyStore cfs : columnFamilyStores)
-        {
-            Set<SSTableReader> sstables = new HashSet<>();
-            for (SSTableReader sstable : cfs.getSSTables())
-            {
-                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(ranges))
-                {
-                    if (!sstable.isRepaired())
-                    {
-                        sstables.add(sstable);
-                    }
-                }
-            }
-            sstablesToRepair.put(cfs.metadata.cfId, sstables);
-        }
-        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, 
System.currentTimeMillis()));
+        parentRepairSessions.put(parentRepairSession, new 
ParentRepairSession(columnFamilyStores, ranges, isIncremental, 
System.currentTimeMillis()));
     }
 
     public void finishParentSession(UUID parentSession, Set<InetAddress> 
neighbors)
@@ -379,18 +363,28 @@ public class ActiveRepairService
 
     public static class ParentRepairSession
     {
-        public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new 
HashMap<>();
-        public final Collection<Range<Token>> ranges;
-        public final Map<UUID, Set<SSTableReader>> sstableMap;
+        private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new 
HashMap<>();
+        private final Collection<Range<Token>> ranges;
+        private final Map<UUID, Set<SSTableReader>> sstableMap = new 
HashMap<>();
         public final long repairedAt;
+        public final boolean isIncremental;
 
-        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, 
Collection<Range<Token>> ranges, Map<UUID, Set<SSTableReader>> sstables, long 
repairedAt)
+        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, 
Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt)
         {
             for (ColumnFamilyStore cfs : columnFamilyStores)
                 this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
             this.ranges = ranges;
-            this.sstableMap = sstables;
             this.repairedAt = repairedAt;
+            this.isIncremental = isIncremental;
+        }
+
+        public void addSSTables(UUID cfId, Set<SSTableReader> sstables)
+        {
+            Set<SSTableReader> existingSSTables = this.sstableMap.get(cfId);
+            if (existingSSTables == null)
+                existingSSTables = new HashSet<>();
+            existingSSTables.addAll(sstables);
+            this.sstableMap.put(cfId, sstables);
         }
 
         public synchronized Collection<SSTableReader> 
getAndReferenceSSTables(UUID cfId)
@@ -412,5 +406,20 @@ public class ActiveRepairService
             }
             return sstables;
         }
+
+        public synchronized Set<SSTableReader> 
getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range)
+        {
+            Collection<SSTableReader> allSSTables= 
getAndReferenceSSTables(cfId);
+            Set<SSTableReader> sstables = new HashSet<>();
+            for (SSTableReader sstable : allSSTables)
+            {
+                if (new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Arrays.asList(range)))
+                    sstables.add(sstable);
+                else
+                    sstable.releaseReference();
+            }
+            return sstables;
+
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index a26858a..ea21f3d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2624,14 +2624,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.STARTED.ordinal()});
 
-                if (options.isSequential() && options.isIncremental())
-                {
-                    message = "It is not possible to mix sequential repair and 
incremental repairs.";
-                    logger.error(message);
-                    sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.FINISHED.ordinal()});
-                    return;
-                }
-
                 final Set<InetAddress> allNeighbors = new HashSet<>();
                 Map<Range, Set<InetAddress>> rangeToNeighbors = new 
HashMap<>();
                 for (Range<Token> range : options.getRanges())
@@ -2664,23 +2656,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 }
 
                 final UUID parentSession;
-                long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
-                if (options.isIncremental())
+                long repairedAt;
+                try
                 {
-                    try
-                    {
-                        parentSession = 
ActiveRepairService.instance.prepareForRepair(allNeighbors, 
options.getRanges(), columnFamilyStores);
-                        repairedAt = 
ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
-                    }
-                    catch (Throwable t)
-                    {
-                        sendNotification("repair", String.format("Repair 
failed with error %s", t.getMessage()), new int[]{cmd, 
ActiveRepairService.Status.FINISHED.ordinal()});
-                        return;
-                    }
+                    parentSession = 
ActiveRepairService.instance.prepareForRepair(allNeighbors, options, 
columnFamilyStores);
+                    repairedAt = 
ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
                 }
-                else
+                catch (Throwable t)
                 {
-                    parentSession = null;
+                    sendNotification("repair", String.format("Repair failed 
with error %s", t.getMessage()), new int[]{cmd, 
ActiveRepairService.Status.FINISHED.ordinal()});
+                    return;
                 }
 
                 // Set up RepairJob executor for this repair command.
@@ -2736,16 +2721,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 {
                     public void onSuccess(@Nullable Object result)
                     {
-                        if (options.isIncremental())
+                        try
                         {
-                            try
-                            {
-                                
ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors);
-                            }
-                            catch (Exception e)
-                            {
-                                logger.error("Error in incremental repair", e);
-                            }
+                            
ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors);
+                        }
+                        catch (Exception e)
+                        {
+                            logger.error("Error in incremental repair", e);
                         }
                         repairComplete();
                     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index a999025..3cd2ea8 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -175,7 +175,7 @@ public class LeveledCompactionStrategyTest
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = 
keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis());
         UUID parentRepSession = UUID.randomUUID();
-        
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
Arrays.asList(cfs), Arrays.asList(range));
+        
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, 
Arrays.asList(cfs), Arrays.asList(range), false);
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, 
UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
         Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index b3d333a..1d11334 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -89,7 +89,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
Arrays.asList(cfs), Arrays.asList(range));
+        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
Arrays.asList(cfs), Arrays.asList(range), false);
 
         RepairJobDesc desc = new RepairJobDesc(parentRepairSession, 
UUID.randomUUID(), KEYSPACE1, "Standard1", range);
 

Reply via email to