Repository: cassandra Updated Branches: refs/heads/trunk 9274197b4 -> e60a06cc8
Mark sstables as repaired after full repair Patch by marcuse; reviewed by yukim for CASSANDRA-7586 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e60a06cc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e60a06cc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e60a06cc Branch: refs/heads/trunk Commit: e60a06cc866e5e85d3e58f25b98f8c048d07ad24 Parents: 9274197 Author: Marcus Eriksson <[email protected]> Authored: Tue Oct 28 16:30:50 2014 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Mon Nov 3 08:28:39 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 13 +++-- .../db/compaction/CompactionManager.java | 24 ++++++-- .../repair/RepairMessageVerbHandler.java | 23 +++++--- .../repair/messages/AnticompactionRequest.java | 8 +++ .../repair/messages/PrepareMessage.java | 10 +++- .../cassandra/repair/messages/RepairOption.java | 7 --- .../cassandra/repair/messages/SyncRequest.java | 11 ++++ .../repair/messages/ValidationRequest.java | 8 +++ .../cassandra/service/ActiveRepairService.java | 61 +++++++++++--------- .../cassandra/service/StorageService.java | 44 +++++--------- .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/repair/LocalSyncTaskTest.java | 2 +- 13 files changed, 127 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index db3b091..3a8ada2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Mark sstables as repaired after full repair (CASSANDRA-7586) * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443) * Integrate JMH for microbenchmarks (CASSANDRA-8151) * Keep sstable levels when bootstrapping (CASSANDRA-7460) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0e3131c..2a61b39 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2151,8 +2151,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean snapshotWithoutFlush(snapshotName, null); } - public void snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate) + public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate) { + Set<SSTableReader> snapshottedSSTables = new HashSet<>(); for (ColumnFamilyStore cfs : concatWithIndexes()) { DataTracker.View currentView = cfs.markCurrentViewReferenced(); @@ -2171,6 +2172,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA)); if (logger.isDebugEnabled()) logger.debug("Snapshot for {} keyspace data file {} created in {}", keyspace, ssTable.getFilename(), snapshotDirectory); + snapshottedSSTables.add(ssTable); } writeSnapshotManifest(filesJSONArr, snapshotName); @@ -2180,6 +2182,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SSTableReader.releaseReferences(currentView.sstables); } } + return snapshottedSSTables; } private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName) @@ -2216,15 +2219,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * * @param snapshotName the name of the associated with the snapshot */ - public void snapshot(String snapshotName) + public Set<SSTableReader> snapshot(String snapshotName) { - snapshot(snapshotName, null); + return snapshot(snapshotName, null); } - public void snapshot(String snapshotName, Predicate<SSTableReader> predicate) + public Set<SSTableReader> snapshot(String snapshotName, Predicate<SSTableReader> predicate) { forceBlockingFlush(); - snapshotWithoutFlush(snapshotName, predicate); + return snapshotWithoutFlush(snapshotName, predicate); } public boolean snapshotExists(String snapshotName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 18ad7ae..3ee36cd 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -903,8 +903,11 @@ public class CompactionManager implements CompactionManagerMBean if (isSnapshotValidation) { // If there is a snapshot created for the session then read from there. + // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we + // are supposed to validate. sstables = cfs.getSnapshotSSTableReader(snapshotName); + // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case @@ -915,12 +918,21 @@ public class CompactionManager implements CompactionManagerMBean { // flush first so everyone is validating data that is as similar as possible StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); - // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced - // instead so they won't be cleaned up if they do get compacted during the validation - if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null) - sstables = cfs.markCurrentSSTablesReferenced(); - else - sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); + Set<SSTableReader> sstablesToValidate = new HashSet<>(); + for (SSTableReader sstable : cfs.getSSTables()) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Arrays.asList(validator.desc.range))) + { + if (!prs.isIncremental || !sstable.isRepaired()) + { + sstablesToValidate.add(sstable); + } + } + } + prs.addSSTables(cfs.metadata.cfId, sstablesToValidate); + + sstables = prs.getAndReferenceSSTablesInRange(cfs.metadata.cfId, validator.desc.range); if (validator.gcBefore > 0) gcBefore = validator.gcBefore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 2ad8dc2..f9180c2 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -18,12 +18,18 @@ package org.apache.cassandra.repair; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; import com.google.common.base.Predicate; + +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,10 +38,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.LocalPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -61,6 +64,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> { case PREPARE_MESSAGE: PrepareMessage prepareMessage = (PrepareMessage) message.payload; + logger.debug("Preparing, {}", prepareMessage); List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size()); for (UUID cfId : prepareMessage.cfIds) { @@ -70,14 +74,16 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> } ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, columnFamilyStores, - prepareMessage.ranges); + prepareMessage.ranges, + prepareMessage.isIncremental); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; case SNAPSHOT: + logger.debug("Snapshotting {}", desc); ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); final Range<Token> repairingRange = desc.range; - cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() + Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() { public boolean apply(SSTableReader sstable) { @@ -86,13 +92,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); } }); - + ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; case VALIDATION_REQUEST: ValidationRequest validationRequest = (ValidationRequest) message.payload; + logger.debug("Validating {}", validationRequest); // trigger read-only compaction ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); @@ -103,7 +110,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> case SYNC_REQUEST: // forwarded sync request SyncRequest request = (SyncRequest) message.payload; - + logger.debug("Syncing {}", request); long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt; @@ -113,8 +120,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> break; case ANTICOMPACTION_REQUEST: - logger.debug("Got anticompaction request"); AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload; + logger.debug("Got anticompaction request {}", anticompactionRequest); try { List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java index 34ea5a5..1a13ad1 100644 --- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java @@ -53,4 +53,12 @@ public class AnticompactionRequest extends RepairMessage return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); } } + + @Override + public String toString() + { + return "AnticompactionRequest{" + + "parentRepairSession=" + parentRepairSession + + "} " + super.toString(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index 5699677..035ccc4 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -38,13 +38,15 @@ public class PrepareMessage extends RepairMessage public final Collection<Range<Token>> ranges; public final UUID parentRepairSession; + public final boolean isIncremental; - public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges) + public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental) { super(Type.PREPARE_MESSAGE, null); this.parentRepairSession = parentRepairSession; this.cfIds = cfIds; this.ranges = ranges; + this.isIncremental = isIncremental; } public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage> @@ -58,6 +60,7 @@ public class PrepareMessage extends RepairMessage out.writeInt(message.ranges.size()); for (Range r : message.ranges) Range.serializer.serialize(r, out, version); + out.writeBoolean(message.isIncremental); } public PrepareMessage deserialize(DataInput in, int version) throws IOException @@ -71,7 +74,8 @@ public class PrepareMessage extends RepairMessage List<Range<Token>> ranges = new ArrayList<>(rangeCount); for (int i = 0; i < rangeCount; i++) ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds()); - return new PrepareMessage(parentRepairSession, cfIds, ranges); + boolean isIncremental = in.readBoolean(); + return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental); } public long serializedSize(PrepareMessage message, int version) @@ -85,6 +89,7 @@ public class PrepareMessage extends RepairMessage size += sizes.sizeof(message.ranges.size()); for (Range r : message.ranges) size += Range.serializer.serializedSize(r, version); + size += sizes.sizeof(message.isIncremental); return size; } } @@ -96,6 +101,7 @@ public class PrepareMessage extends RepairMessage "cfIds='" + cfIds + '\'' + ", ranges=" + ranges + ", parentRepairSession=" + parentRepairSession + + ", isIncremental="+isIncremental + '}'; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index ca02365..63446e5 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -211,13 +211,6 @@ public class RepairOption public RepairOption(boolean sequential, boolean primaryRange, boolean incremental, int jobThreads, Collection<Range<Token>> ranges) { - if (sequential && incremental) - { - String message = "It is not possible to mix sequential repair and incremental repairs."; - logger.error(message); - throw new IllegalArgumentException(message); - } - if (!FBUtilities.isUnix() && sequential) { logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/SyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java index e677cd8..c4d0ab6 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java @@ -92,4 +92,15 @@ public class SyncRequest extends RepairMessage return size; } } + + @Override + public String toString() + { + return "SyncRequest{" + + "initiator=" + initiator + + ", src=" + src + + ", dst=" + dst + + ", ranges=" + ranges + + "} " + super.toString(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java index c73b708..43bcf23 100644 --- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java @@ -42,6 +42,14 @@ public class ValidationRequest extends RepairMessage } @Override + public String toString() + { + return "ValidationRequest{" + + "gcBefore=" + gcBefore + + "} " + super.toString(); + } + + @Override public boolean equals(Object o) { if (this == o) return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index fa354e6..08cef5c 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -228,10 +228,10 @@ public class ActiveRepairService return neighbors; } - public UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores) + public UUID prepareForRepair(Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { UUID parentRepairSession = UUIDGen.getTimeUUID(); - registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges); + registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); IAsyncCallbackWithFailure callback = new IAsyncCallbackWithFailure() @@ -259,7 +259,7 @@ public class ActiveRepairService for(InetAddress neighbour : endpoints) { - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges); + PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental()); MessageOut<RepairMessage> msg = message.createMessage(); MessagingService.instance().sendRRWithFailure(msg, neighbour, callback); } @@ -282,25 +282,9 @@ public class ActiveRepairService return parentRepairSession; } - public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges) + public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental) { - Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>(); - for (ColumnFamilyStore cfs : columnFamilyStores) - { - Set<SSTableReader> sstables = new HashSet<>(); - for (SSTableReader sstable : cfs.getSSTables()) - { - if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) - { - if (!sstable.isRepaired()) - { - sstables.add(sstable); - } - } - } - sstablesToRepair.put(cfs.metadata.cfId, sstables); - } - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis())); + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis())); } public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors) @@ -379,18 +363,28 @@ public class ActiveRepairService public static class ParentRepairSession { - public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); - public final Collection<Range<Token>> ranges; - public final Map<UUID, Set<SSTableReader>> sstableMap; + private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); + private final Collection<Range<Token>> ranges; + private final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>(); public final long repairedAt; + public final boolean isIncremental; - public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, Map<UUID, Set<SSTableReader>> sstables, long repairedAt) + public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt) { for (ColumnFamilyStore cfs : columnFamilyStores) this.columnFamilyStores.put(cfs.metadata.cfId, cfs); this.ranges = ranges; - this.sstableMap = sstables; this.repairedAt = repairedAt; + this.isIncremental = isIncremental; + } + + public void addSSTables(UUID cfId, Set<SSTableReader> sstables) + { + Set<SSTableReader> existingSSTables = this.sstableMap.get(cfId); + if (existingSSTables == null) + existingSSTables = new HashSet<>(); + existingSSTables.addAll(sstables); + this.sstableMap.put(cfId, sstables); } public synchronized Collection<SSTableReader> getAndReferenceSSTables(UUID cfId) @@ -412,5 +406,20 @@ public class ActiveRepairService } return sstables; } + + public synchronized Set<SSTableReader> getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range) + { + Collection<SSTableReader> allSSTables= getAndReferenceSSTables(cfId); + Set<SSTableReader> sstables = new HashSet<>(); + for (SSTableReader sstable : allSSTables) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Arrays.asList(range))) + sstables.add(sstable); + else + sstable.releaseReference(); + } + return sstables; + + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a26858a..ea21f3d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2624,14 +2624,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info(message); sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()}); - if (options.isSequential() && options.isIncremental()) - { - message = "It is not possible to mix sequential repair and incremental repairs."; - logger.error(message); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); - return; - } - final Set<InetAddress> allNeighbors = new HashSet<>(); Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); for (Range<Token> range : options.getRanges()) @@ -2664,23 +2656,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } final UUID parentSession; - long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; - if (options.isIncremental()) + long repairedAt; + try { - try - { - parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options.getRanges(), columnFamilyStores); - repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt; - } - catch (Throwable t) - { - sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); - return; - } + parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options, columnFamilyStores); + repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt; } - else + catch (Throwable t) { - parentSession = null; + sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + return; } // Set up RepairJob executor for this repair command. @@ -2736,16 +2721,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { public void onSuccess(@Nullable Object result) { - if (options.isIncremental()) + try { - try - { - ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors); - } - catch (Exception e) - { - logger.error("Error in incremental repair", e); - } + ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors); + } + catch (Exception e) + { + logger.error("Error in incremental repair", e); } repairComplete(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index a999025..3cd2ea8 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -175,7 +175,7 @@ public class LeveledCompactionStrategyTest Range<Token> range = new Range<>(Util.token(""), Util.token("")); int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range)); + ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e60a06cc/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index b3d333a..1d11334 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -89,7 +89,7 @@ public class LocalSyncTaskTest extends SchemaLoader Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range)); + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false); RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range);
