Repository: cassandra Updated Branches: refs/heads/trunk a3e772b8b -> b29736c27
Add support to one way targeted repair (pull-repair) Patch by Geoffrey Yu; Reviewed by Paulo Motta for CASSANDRA-9876 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b29736c2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b29736c2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b29736c2 Branch: refs/heads/trunk Commit: b29736c27acff3a62a1416a7c6cd7f77deb96b84 Parents: a3e772b Author: Geoffrey Yu <[email protected]> Authored: Thu Aug 4 13:40:14 2016 -0700 Committer: Yuki Morishita <[email protected]> Committed: Thu Aug 11 14:58:18 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/LocalSyncTask.java | 19 +++++--- .../org/apache/cassandra/repair/RepairJob.java | 2 +- .../apache/cassandra/repair/RepairRunnable.java | 3 +- .../apache/cassandra/repair/RepairSession.java | 4 ++ .../cassandra/repair/messages/RepairOption.java | 36 ++++++++++++++- .../cassandra/service/ActiveRepairService.java | 10 +++-- .../cassandra/service/StorageService.java | 4 +- .../apache/cassandra/tools/nodetool/Repair.java | 4 ++ .../cassandra/repair/LocalSyncTaskTest.java | 4 +- .../cassandra/repair/RepairSessionTest.java | 2 +- .../repair/messages/RepairOptionTest.java | 46 +++++++++++++++++++- 12 files changed, 114 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e1b0a44..bba64c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Add support to one-way targeted repair (CASSANDRA-9876) * Remove clientutil jar (CASSANDRA-11635) * Fix compaction throughput throttle (CASSANDRA-12366) * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index a92708f..cfc181e 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -47,10 +47,13 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler private final long repairedAt; - public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt) + private final boolean pullRepair; + + public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, boolean pullRepair) { super(desc, r1, r2); this.repairedAt = repairedAt; + this.pullRepair = pullRepair; } /** @@ -73,13 +76,17 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler isIncremental = prs.isIncremental; } Tracing.traceRepair(message); - new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this) + StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node - .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily) - // send ranges to the remote node - .transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily) - .execute(); + .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); + if (!pullRepair) + { + // send ranges to the remote node if we are not performing a pull repair + plan.transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); + } + + plan.execute(); } public void handleStreamEvent(StreamEvent event) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 454865b..c768db6 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -118,7 +118,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable SyncTask task; if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - task = new LocalSyncTask(desc, r1, r2, repairedAt); + task = new LocalSyncTask(desc, r1, r2, repairedAt, session.pullRepair); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index d099f72..b69d8ce 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -103,7 +103,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message) { fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message)); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress)); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, String.format("Repair command #%d finished with error", cmd))); } protected void runMayThrow() throws Exception @@ -226,6 +226,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti options.getParallelism(), p.left, repairedAt, + options.isPullRepair(), executor, cfnames); if (session == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index a52b352..cad506d 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -87,6 +87,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public final String keyspace; private final String[] cfnames; public final RepairParallelism parallelismDegree; + public final boolean pullRepair; /** Range to repair */ public final Collection<Range<Token>> ranges; public final Set<InetAddress> endpoints; @@ -117,6 +118,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees * @param endpoints the data centers that should be part of the repair; null for all DCs * @param repairedAt when the repair occurred (millis) + * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption) * @param cfnames names of columnfamilies */ public RepairSession(UUID parentRepairSession, @@ -126,6 +128,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, + boolean pullRepair, String... cfnames) { assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; @@ -139,6 +142,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.endpoints = endpoints; this.repairedAt = repairedAt; this.validationRemaining = new AtomicInteger(cfnames.length); + this.pullRepair = pullRepair; } public UUID getId() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index 82dd181..1f34973 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -46,6 +46,7 @@ public class RepairOption public static final String HOSTS_KEY = "hosts"; public static final String TRACE_KEY = "trace"; public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair"; + public static final String PULL_REPAIR_KEY = "pullRepair"; // we don't want to push nodes too much for repair public static final int MAX_JOB_THREADS = 4; @@ -116,6 +117,12 @@ public class RepairOption * Multiple hosts can be given as comma separated values(e.g. cass1,cass2).</td> * <td></td> * </tr> + * <tr> + * <td>pullRepair</td> + * <td>"true" if the repair should only stream data one way from a remote host to this host. + * This is only allowed if exactly 2 hosts are specified along with a token range that they share.</td> + * <td>false</td> + * </tr> * </tbody> * </table> * @@ -130,6 +137,7 @@ public class RepairOption boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY)); boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY)); boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY)); + boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY)); int jobThreads = 1; if (options.containsKey(JOB_THREADS_KEY)) @@ -163,7 +171,7 @@ public class RepairOption } } - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty()); + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@ -209,10 +217,25 @@ public class RepairOption { throw new IllegalArgumentException("Too many job threads. Max is " + MAX_JOB_THREADS); } + if (!dataCenters.isEmpty() && !hosts.isEmpty()) + { + throw new IllegalArgumentException("Cannot combine -dc and -hosts options."); + } if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty())) { throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); } + if (pullRepair) + { + if (hosts.size() != 2) + { + throw new IllegalArgumentException("Pull repair can only be performed between two hosts. Please specify two hosts, one of which must be this host."); + } + else if (ranges.isEmpty()) + { + throw new IllegalArgumentException("Token ranges must be specified when performing pull repair. Please specify at least one token range which both hosts have in common."); + } + } return option; } @@ -223,13 +246,14 @@ public class RepairOption private final boolean trace; private final int jobThreads; private final boolean isSubrangeRepair; + private final boolean pullRepair; private final Collection<String> columnFamilies = new HashSet<>(); private final Collection<String> dataCenters = new HashSet<>(); private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair) + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair) { if (FBUtilities.isWindows && (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) && @@ -247,6 +271,7 @@ public class RepairOption this.jobThreads = jobThreads; this.ranges.addAll(ranges); this.isSubrangeRepair = isSubrangeRepair; + this.pullRepair = pullRepair; } public RepairParallelism getParallelism() @@ -269,6 +294,11 @@ public class RepairOption return trace; } + public boolean isPullRepair() + { + return pullRepair; + } + public int getJobThreads() { return jobThreads; @@ -316,6 +346,7 @@ public class RepairOption ", dataCenters: " + dataCenters + ", hosts: " + hosts + ", # of ranges: " + ranges.size() + + ", pull repair: " + pullRepair + ')'; } @@ -332,6 +363,7 @@ public class RepairOption options.put(SUB_RANGE_REPAIR_KEY, Boolean.toString(isSubrangeRepair)); options.put(TRACE_KEY, Boolean.toString(trace)); options.put(RANGES_KEY, Joiner.on(",").join(ranges)); + options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair)); return options; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 435c7c8..4699ae1 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -132,6 +132,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, + boolean pullRepair, ListeningExecutorService executor, String... cfnames) { @@ -141,7 +142,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, cfnames); + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames); sessions.put(session.getId(), session); // register listeners @@ -245,9 +246,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (specifiedHost.size() <= 1) { - String msg = "Repair requires at least two endpoints that are neighbours before it can continue, the endpoint used for this repair is %s, " + - "other available neighbours are %s but these neighbours were not part of the supplied list of hosts to use during the repair (%s)."; - throw new IllegalArgumentException(String.format(msg, specifiedHost, neighbors, hosts)); + String msg = "Specified hosts %s do not share range %s needed for repair. Either restrict repair ranges " + + "with -st/-et options, or specify one of the neighbors that share this range with " + + "this node: %s."; + throw new IllegalArgumentException(String.format(msg, hosts, toRepair, neighbors)); } specifiedHost.remove(FBUtilities.getBroadcastAddress()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e3b4752..2810e2f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3172,7 +3172,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE parallelism = RepairParallelism.PARALLEL; } - RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList(), false); + RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList(), false, false); if (dataCenters != null) { options.getDataCenters().addAll(dataCenters); @@ -3264,7 +3264,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE "The repair will occur but without anti-compaction."); Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); - RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange, true); + RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange, true, false); if (dataCenters != null) { options.getDataCenters().addAll(dataCenters); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/tools/nodetool/Repair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java index 02bfc5b..5383fa5 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@ -81,6 +81,9 @@ public class Repair extends NodeToolCmd @Option(title = "trace_repair", name = {"-tr", "--trace"}, description = "Use -tr to trace the repair. Traces are logged to system_traces.events.") private boolean trace = false; + @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.") + private boolean pullRepair = false; + @Override public void execute(NodeProbe probe) { @@ -108,6 +111,7 @@ public class Repair extends NodeToolCmd options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(numJobThreads)); options.put(RepairOption.TRACE_KEY, Boolean.toString(trace)); options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ",")); + options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair)); if (!startToken.isEmpty() || !endToken.isEmpty()) { options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 6aacae6..0fceaf4 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -76,7 +76,7 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(ep1, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -111,7 +111,7 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false); task.run(); // ensure that the changed range was recorded http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index d40982c..7b31c26 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -54,7 +54,7 @@ public class RepairSessionTest IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); Set<InetAddress> endpoints = Sets.newHashSet(remote); - RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1"); + RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, "Standard1"); // perform convict session.convict(remote, Double.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java index a564cff..665a0b7 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java @@ -38,7 +38,10 @@ import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.matchers.JUnitMatchers.containsString; public class RepairOptionTest { @@ -59,7 +62,7 @@ public class RepairOptionTest assertFalse(option.isPrimaryRange()); assertFalse(option.isIncremental()); - // parse everything + // parse everything except hosts (hosts cannot be combined with data centers) Map<String, String> options = new HashMap<>(); options.put(RepairOption.PARALLELISM_KEY, "parallel"); options.put(RepairOption.PRIMARY_RANGE_KEY, "false"); @@ -67,7 +70,6 @@ public class RepairOptionTest options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30"); options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3"); options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3"); - options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3"); option = RepairOption.parse(options, partitioner); assertTrue(option.getParallelism() == RepairParallelism.PARALLEL); @@ -92,6 +94,14 @@ public class RepairOptionTest expectedDCs.add("dc3"); assertEquals(expectedDCs, option.getDataCenters()); + // expect an error when parsing with hosts as well + options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3"); + assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Cannot combine -dc and -hosts options"); + + // remove data centers to proceed with testing parsing hosts + options.remove(RepairOption.DATACENTERS_KEY); + option = RepairOption.parse(options, partitioner); + Set<String> expectedHosts = new HashSet<>(3); expectedHosts.add("127.0.0.1"); expectedHosts.add("127.0.0.2"); @@ -100,6 +110,25 @@ public class RepairOptionTest } @Test + public void testPullRepairParseOptions() + { + Map<String, String> options = new HashMap<>(); + + options.put(RepairOption.PULL_REPAIR_KEY, "true"); + assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Pull repair can only be performed between two hosts"); + + options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3"); + assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Pull repair can only be performed between two hosts"); + + options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2"); + assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Token ranges must be specified when performing pull repair"); + + options.put(RepairOption.RANGES_KEY, "0:10"); + RepairOption option = RepairOption.parse(options, Murmur3Partitioner.instance); + assertTrue(option.isPullRepair()); + } + + @Test public void testIncrementalRepairWithSubrangesIsNotGlobal() throws Exception { RepairOption ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, "42:42"), @@ -109,4 +138,17 @@ public class RepairOptionTest Murmur3Partitioner.instance); assertTrue(ro.isGlobal()); } + + private void assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> optionsToParse, String expectedErrorMessage) + { + try + { + RepairOption.parse(optionsToParse, Murmur3Partitioner.instance); + fail(String.format("Expected RepairOption.parse() to throw an IllegalArgumentException containing the message '%s'", expectedErrorMessage)); + } + catch (IllegalArgumentException ex) + { + assertThat(ex.getMessage(), containsString(expectedErrorMessage)); + } + } }
