Repository: cassandra Updated Branches: refs/heads/trunk c6cd82462 -> 3cec208c4
Add incremental repair support for --hosts, --force, and subrange repair Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13818 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cec208c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cec208c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cec208c Branch: refs/heads/trunk Commit: 3cec208c40b85e1be0ff8c68fc9d9017945a1ed8 Parents: c6cd824 Author: Blake Eggleston <[email protected]> Authored: Mon Aug 28 10:33:34 2017 -0700 Committer: Blake Eggleston <[email protected]> Committed: Tue Sep 12 15:51:34 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 4 +- .../org/apache/cassandra/repair/RepairJob.java | 10 +- .../repair/RepairMessageVerbHandler.java | 6 +- .../apache/cassandra/repair/RepairRunnable.java | 161 ++++++++++++++----- .../apache/cassandra/repair/RepairSession.java | 12 +- .../org/apache/cassandra/repair/Validator.java | 10 +- .../repair/consistent/ConsistentSession.java | 3 +- .../cassandra/repair/messages/RepairOption.java | 16 +- .../cassandra/service/ActiveRepairService.java | 33 ++-- ...pactionStrategyManagerPendingRepairTest.java | 2 +- .../cassandra/repair/AbstractRepairTest.java | 2 + .../cassandra/repair/RepairRunnableTest.java | 65 ++++++++ .../repair/consistent/LocalSessionTest.java | 1 - .../repair/messages/RepairOptionTest.java | 13 -- .../service/ActiveRepairServiceTest.java | 55 +++++++ 16 files changed, 289 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1f03ec5..55bbfa8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818) * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786) * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846) * Add keyspace and table name in schema validation exception (CASSANDRA-13845) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5619da7..06fbef2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1330,7 +1330,7 @@ public class CompactionManager implements CompactionManagerMBean } else { - if (!validator.isConsistent) + if (!validator.isIncremental) { // flush first so everyone is validating data that is as similar as possible StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); @@ -1447,7 +1447,7 @@ public class CompactionManager implements CompactionManagerMBean predicate = prs.getPreviewPredicate(); } - else if (validator.isConsistent) + else if (validator.isIncremental) { predicate = s -> validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 0615681..4bc3496 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -43,7 +43,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private final RepairJobDesc desc; private final RepairParallelism parallelismDegree; private final ListeningExecutorService taskExecutor; - private final boolean isConsistent; + private final boolean isIncremental; private final PreviewKind previewKind; /** @@ -52,13 +52,13 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable * @param session RepairSession that this RepairJob belongs * @param columnFamily name of the ColumnFamily to repair */ - public RepairJob(RepairSession session, String columnFamily, boolean isConsistent, PreviewKind previewKind) + public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind) { this.session = session; this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges()); this.taskExecutor = session.taskExecutor; this.parallelismDegree = session.parallelismDegree; - this.isConsistent = isConsistent; + this.isIncremental = isIncremental; this.previewKind = previewKind; } @@ -81,7 +81,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable if (parallelismDegree != RepairParallelism.PARALLEL) { ListenableFuture<List<InetAddress>> allSnapshotTasks; - if (isConsistent) + if (isIncremental) { // consistent repair does it's own "snapshotting" allSnapshotTasks = Futures.immediateFuture(allEndpoints); @@ -135,7 +135,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable SyncTask task; if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - task = new LocalSyncTask(desc, r1, r2, isConsistent ? desc.parentSessionId : null, session.pullRepair, session.previewKind); + task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index c38d098..3c7f890 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -46,7 +46,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> { private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class); - private boolean isConsistent(UUID sessionID) + private boolean isIncremental(UUID sessionID) { return ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID); } @@ -136,7 +136,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId); Validator validator = new Validator(desc, message.from, validationRequest.nowInSec, - isConsistent(desc.parentSessionId), previewKind(desc.parentSessionId)); + isIncremental(desc.parentSessionId), previewKind(desc.parentSessionId)); CompactionManager.instance.submitValidation(store, validator); break; @@ -144,7 +144,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> // forwarded sync request SyncRequest request = (SyncRequest) message.payload; logger.debug("Syncing {}", request); - StreamingRepairTask task = new StreamingRepairTask(desc, request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind); + StreamingRepairTask task = new StreamingRepairTask(desc, request, isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind); task.run(); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index b581ebd..9e37ada 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -27,18 +27,26 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.*; import org.apache.commons.lang3.time.DurationFormatUtils; +import org.junit.internal.runners.statements.Fail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.repair.consistent.SyncStatSummary; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.db.Keyspace; @@ -130,6 +138,47 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti recordFailure(message, completionMessage); } + @VisibleForTesting + static class CommonRange + { + public final Set<InetAddress> endpoints; + public final Collection<Range<Token>> ranges; + + public CommonRange(Set<InetAddress> endpoints, Collection<Range<Token>> ranges) + { + Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty()); + Preconditions.checkArgument(ranges != null && !ranges.isEmpty()); + this.endpoints = endpoints; + this.ranges = ranges; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CommonRange that = (CommonRange) o; + + if (!endpoints.equals(that.endpoints)) return false; + return ranges.equals(that.ranges); + } + + public int hashCode() + { + int result = endpoints.hashCode(); + result = 31 * result + ranges.hashCode(); + return result; + } + + public String toString() + { + return "CommonRange{" + + "endpoints=" + endpoints + + ", ranges=" + ranges + + '}'; + } + } + protected void runMayThrow() throws Exception { ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.IN_PROGRESS, ImmutableList.of()); @@ -184,7 +233,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } final Set<InetAddress> allNeighbors = new HashSet<>(); - List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>(); + List<CommonRange> commonRanges = new ArrayList<>(); //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent //calculation multiple times @@ -235,11 +284,9 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options); } - long repairedAt; try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time()) { ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores); - repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt(); progress.incrementAndGet(); } catch (Throwable t) @@ -254,23 +301,22 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti if (options.isPreview()) { - previewRepair(parentSession, repairedAt, startTime, traceState, allNeighbors, commonRanges, cfnames); + previewRepair(parentSession, startTime, commonRanges, cfnames); } else if (options.isIncremental()) { - consistentRepair(parentSession, repairedAt, startTime, traceState, allNeighbors, commonRanges, cfnames); + incrementalRepair(parentSession, startTime, options.isForcedRepair(), traceState, allNeighbors, commonRanges, cfnames); } else { - normalRepair(parentSession, startTime, traceState, allNeighbors, commonRanges, cfnames); + normalRepair(parentSession, startTime, traceState, commonRanges, cfnames); } } private void normalRepair(UUID parentSession, long startTime, TraceState traceState, - Set<InetAddress> allNeighbors, - List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, + List<CommonRange> commonRanges, String... cfnames) { @@ -295,15 +341,11 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti logger.debug("Repair result: {}", results); if (sessionResult != null) { - // don't promote sstables for sessions we skipped replicas for + // don't record successful repair if we had to skip ranges if (!sessionResult.skippedReplicas) { successfulRanges.addAll(sessionResult.ranges); } - else - { - logger.debug("Skipping anticompaction for {}", results); - } } else { @@ -316,26 +358,59 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor)); } - private void consistentRepair(UUID parentSession, - long repairedAt, - long startTime, - TraceState traceState, - Set<InetAddress> allNeighbors, - List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, - String... cfnames) + /** + * removes dead nodes from common ranges, and exludes ranges left without any participants + */ + @VisibleForTesting + static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddress> liveEndpoints, boolean force) { - // the local node also needs to be included in the set of - // participants, since coordinator sessions aren't persisted - Set<InetAddress> allParticipants = new HashSet<>(allNeighbors); - allParticipants.add(FBUtilities.getBroadcastAddress()); + if (!force) + { + return commonRanges; + } + else + { + List<CommonRange> filtered = new ArrayList<>(commonRanges.size()); + + for (CommonRange commonRange: commonRanges) + { + Set<InetAddress> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains)); + + // this node is implicitly a participant in this repair, so a single endpoint is ok here + if (!endpoints.isEmpty()) + { + filtered.add(new CommonRange(endpoints, commonRange.ranges)); + } + } + Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair"); + return filtered; + } + } + + private void incrementalRepair(UUID parentSession, + long startTime, + boolean forceRepair, + TraceState traceState, + Set<InetAddress> allNeighbors, + List<CommonRange> commonRanges, + String... cfnames) + { + // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted + Predicate<InetAddress> isAlive = FailureDetector.instance::isAlive; + Set<InetAddress> allParticipants = ImmutableSet.<InetAddress>builder() + .addAll(forceRepair ? Iterables.filter(allNeighbors, isAlive) : allNeighbors) + .add(FBUtilities.getBroadcastAddress()) + .build(); + + List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair); CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants); ListeningExecutorService executor = createExecutor(); AtomicBoolean hasFailure = new AtomicBoolean(false); - ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, commonRanges, cfnames), + ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames), hasFailure); Collection<Range<Token>> ranges = new HashSet<>(); - for (Collection<Range<Token>> range : Iterables.transform(commonRanges, cr -> cr.right)) + for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges)) { ranges.addAll(range); } @@ -343,11 +418,8 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } private void previewRepair(UUID parentSession, - long repairedAt, long startTime, - TraceState traceState, - Set<InetAddress> allNeighbors, - List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, + List<CommonRange> commonRanges, String... cfnames) { @@ -421,22 +493,27 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession, - boolean isConsistent, + boolean isIncremental, ListeningExecutorService executor, - List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges, + List<CommonRange> commonRanges, String... cfnames) { List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); - for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges) + + // we do endpoint filtering at the start of an incremental repair, + // so repair sessions shouldn't also be checking liveness + boolean force = options.isForcedRepair() && !isIncremental; + for (CommonRange cr : commonRanges) { + logger.info("Starting RepairSession for {}", cr); RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, - p.right, + cr.ranges, keyspace, options.getParallelism(), - p.left, - isConsistent, + cr.endpoints, + isIncremental, options.isPullRepair(), - options.isForcedRepair(), + force, options.getPreviewKind(), executor, cfnames); @@ -595,22 +672,22 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti ImmutableList.of(failureMessage, completionMessage)); } - private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors) + private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors) { for (int i = 0; i < neighborRangeList.size(); i++) { - Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i); + CommonRange cr = neighborRangeList.get(i); - if (p.left.containsAll(neighbors)) + if (cr.endpoints.containsAll(neighbors)) { - p.right.add(range); + cr.ranges.add(range); return; } } List<Range<Token>> ranges = new ArrayList<>(); ranges.add(range); - neighborRangeList.add(Pair.create(neighbors, ranges)); + neighborRangeList.add(new CommonRange(neighbors, ranges)); } private Thread createQueryThread(final int cmd, final UUID sessionId) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index d00e1b2..5dbd050 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -36,7 +36,6 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTrees; @@ -97,7 +96,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement /** Range to repair */ public final Collection<Range<Token>> ranges; public final Set<InetAddress> endpoints; - public final boolean isConsistent; + public final boolean isIncremental; public final PreviewKind previewKind; private final AtomicBoolean isFailed = new AtomicBoolean(false); @@ -131,7 +130,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, - boolean isConsistent, + boolean isIncremental, boolean pullRepair, boolean force, PreviewKind previewKind, @@ -162,7 +161,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement } if (!removeCandidates.isEmpty()) { - // we shouldn't be promoting sstables to repaired if any replicas are excluded from the repair + // we shouldn't be recording a successful repair if + // any replicas are excluded from the repair forceSkippedReplicas = true; endpoints = new HashSet<>(endpoints); endpoints.removeAll(removeCandidates); @@ -170,7 +170,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement } this.endpoints = endpoints; - this.isConsistent = isConsistent; + this.isIncremental = isIncremental; this.previewKind = previewKind; this.pullRepair = pullRepair; this.skippedReplicas = forceSkippedReplicas; @@ -301,7 +301,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length); for (String cfname : cfnames) { - RepairJob job = new RepairJob(this, cfname, isConsistent, previewKind); + RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind); executor.execute(job); jobs.add(job); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index bdf8cca..f9556d6 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -60,7 +60,7 @@ public class Validator implements Runnable public final InetAddress initiator; public final int nowInSec; private final boolean evenTreeDistribution; - public final boolean isConsistent; + public final boolean isIncremental; // null when all rows with the min token have been consumed private long validated; @@ -79,17 +79,17 @@ public class Validator implements Runnable this(desc, initiator, nowInSec, false, false, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isConsistent, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind) { - this(desc, initiator, nowInSec, false, isConsistent, previewKind); + this(desc, initiator, nowInSec, false, isIncremental, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind) { this.desc = desc; this.initiator = initiator; this.nowInSec = nowInSec; - this.isConsistent = isConsistent; + this.isIncremental = isIncremental; this.previewKind = previewKind; validated = 0; range = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java index 803a1f8..c137346 100644 --- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java @@ -313,7 +313,8 @@ public abstract class ConsistentSession Preconditions.checkArgument(coordinator != null); Preconditions.checkArgument(ids != null); Preconditions.checkArgument(!ids.isEmpty()); - Preconditions.checkArgument(repairedAt > 0); + Preconditions.checkArgument(repairedAt > 0 + || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE); Preconditions.checkArgument(ranges != null); Preconditions.checkArgument(!ranges.isEmpty()); Preconditions.checkArgument(participants != null); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index a95ee19..971bf5d 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -163,10 +163,6 @@ public class RepairOption Set<Range<Token>> ranges = new HashSet<>(); if (rangesStr != null) { - if (incremental) - logger.warn("Incremental repair can't be requested with subrange repair " + - "because each subrange repair would generate an anti-compacted table. " + - "The repair will occur but without anti-compaction."); StringTokenizer tokenizer = new StringTokenizer(rangesStr, ","); while (tokenizer.hasMoreTokens()) { @@ -251,16 +247,6 @@ public class RepairOption } } - if (option.isIncremental() && !option.isPreview() && !option.isGlobal()) - { - throw new IllegalArgumentException("Incremental repairs cannot be run against a subset of tokens or ranges"); - } - - if (option.isIncremental() && option.isForcedRepair()) - { - throw new IllegalArgumentException("Cannot force incremental repair"); - } - return option; } @@ -359,7 +345,7 @@ public class RepairOption public boolean isGlobal() { - return dataCenters.isEmpty() && hosts.isEmpty() && !isSubrangeRepair(); + return dataCenters.isEmpty() && hosts.isEmpty(); } public boolean isSubrangeRepair() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 2e02f0c..ab92822 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -209,7 +209,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, - boolean isConsistent, + boolean isIncremental, boolean pullRepair, boolean force, PreviewKind previewKind, @@ -222,7 +222,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, force, previewKind, cfnames); + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, cfnames); sessions.put(session.getId(), session); // register listeners @@ -372,10 +372,28 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return neighbors; } + /** + * we only want to set repairedAt for incremental repairs including all replicas for a token range. For non-global + * incremental repairs, forced incremental repairs, and full repairs, the UNREPAIRED_SSTABLE value will prevent + * sstables from being promoted to repaired or preserve the repairedAt/pendingRepair values, respectively. + */ + static long getRepairedAt(RepairOption options) + { + // we only want to set repairedAt for incremental repairs including all replicas for a token range. For non-global incremental repairs, forced incremental repairs, and + // full repairs, the UNREPAIRED_SSTABLE value will prevent sstables from being promoted to repaired or preserve the repairedAt/pendingRepair values, respectively. + if (options.isIncremental() && options.isGlobal() && !options.isForcedRepair()) + { + return Clock.instance.currentTimeMillis(); + } + else + { + return ActiveRepairService.UNREPAIRED_SSTABLE; + } + } + public UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - // we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables - long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE; + long repairedAt = getRepairedAt(options); registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); @@ -583,13 +601,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } } - public long getRepairedAt() - { - if (isGlobal) - return repairedAt; - return ActiveRepairService.UNREPAIRED_SSTABLE; - } - public Collection<ColumnFamilyStore> getColumnFamilyStores() { return ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java index af629e5..c7f1ae8 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java @@ -248,7 +248,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR csm.getForPendingRepair(repairID).forEach(Assert::assertNull); // sstable should have pendingRepair cleared, and repairedAt set correctly - long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).getRepairedAt(); + long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt; Assert.assertFalse(sstable.isPendingRepair()); Assert.assertTrue(sstable.isRepaired()); Assert.assertEquals(expectedRepairedAt, sstable.getSSTableMetadata().repairedAt); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java index d61d859..21c51c6 100644 --- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java +++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java @@ -75,6 +75,8 @@ public abstract class AbstractRepairTest protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3)); protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5)); + protected static final Set<Range<Token>> ALL_RANGES = ImmutableSet.of(RANGE1, RANGE2, RANGE3); + protected static UUID registerSession(ColumnFamilyStore cfs, boolean isIncremental, boolean isGlobal) { UUID sessionId = UUIDGen.getTimeUUID(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java new file mode 100644 index 0000000..db76f73 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.repair.RepairRunnable.CommonRange; + +import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges; + +public class RepairRunnableTest extends AbstractRepairTest +{ + /** + * For non-forced repairs, common ranges should be passed through as-is + */ + @Test + public void filterCommonIncrementalRangesNotForced() throws Exception + { + CommonRange cr = new CommonRange(PARTICIPANTS, ALL_RANGES); + + List<CommonRange> expected = Lists.newArrayList(cr); + List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false); + + Assert.assertEquals(expected, actual); + } + + @Test + public void forceFilterCommonIncrementalRanges() throws Exception + { + CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)); + CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3)); + Set<InetAddress> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded + + List<CommonRange> initial = Lists.newArrayList(cr1, cr2); + List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)), + new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3))); + List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true); + + Assert.assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index be048fb..6e6d222 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -223,7 +223,6 @@ public class LocalSessionTest extends AbstractRepairTest assertValidationFailure(b -> b.withCoordinator(null)); assertValidationFailure(b -> b.withTableIds(null)); assertValidationFailure(b -> b.withTableIds(new HashSet<>())); - assertValidationFailure(b -> b.withRepairedAt(0)); assertValidationFailure(b -> b.withRepairedAt(-1)); assertValidationFailure(b -> b.withRanges(null)); assertValidationFailure(b -> b.withRanges(new HashSet<>())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java index 13d7575..484d7a8 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java @@ -150,19 +150,6 @@ public class RepairOptionTest } @Test - public void testNonGlobalIncrementalRepairParse() throws Exception - { - Map<String, String> options = new HashMap<>(); - options.put(RepairOption.PARALLELISM_KEY, "parallel"); - options.put(RepairOption.PRIMARY_RANGE_KEY, "false"); - options.put(RepairOption.INCREMENTAL_KEY, "true"); - options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3"); - options.put(RepairOption.HOSTS_KEY, "127.0.0.1, 127.0.0.2"); - assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Incremental repairs cannot be run against a subset of tokens or ranges"); - - } - - @Test public void testForceOption() throws Exception { RepairOption option; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 57ffa7d..cbacaec 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -21,8 +21,11 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.*; +import javax.xml.crypto.Data; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -34,17 +37,26 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; +import static org.apache.cassandra.repair.messages.RepairOption.DATACENTERS_KEY; +import static org.apache.cassandra.repair.messages.RepairOption.FORCE_REPAIR_KEY; +import static org.apache.cassandra.repair.messages.RepairOption.HOSTS_KEY; +import static org.apache.cassandra.repair.messages.RepairOption.INCREMENTAL_KEY; +import static org.apache.cassandra.repair.messages.RepairOption.RANGES_KEY; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; +import static org.apache.cassandra.service.ActiveRepairService.getRepairedAt; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -299,4 +311,47 @@ public class ActiveRepairServiceTest cfs.forceBlockingFlush(); } } + + private static RepairOption opts(String... params) + { + assert params.length % 2 == 0 : "unbalanced key value pairs"; + Map<String, String> opt = new HashMap<>(); + for (int i=0; i<(params.length >> 1); i++) + { + int idx = i << 1; + opt.put(params[idx], params[idx+1]); + } + return RepairOption.parse(opt, DatabaseDescriptor.getPartitioner()); + } + + private static String b2s(boolean b) + { + return Boolean.toString(b); + } + + /** + * Tests the expected repairedAt value is returned, based on different RepairOption + */ + @Test + public void repairedAt() throws Exception + { + // regular incremental repair + Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true)))); + // subrange incremental repair + Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), + RANGES_KEY, "1:2"))); + + // hosts incremental repair + Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), + HOSTS_KEY, "127.0.0.1"))); + // dc incremental repair + Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), + DATACENTERS_KEY, "DC2"))); + // forced incremental repair + Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), + FORCE_REPAIR_KEY, b2s(true)))); + + // full repair + Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false)))); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
