This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 881cff46da8f4a0369e3dcd96231535da17ec193 Merge: 3db6444 0f46c90 Author: David Capwell <dcapw...@apache.org> AuthorDate: Thu Sep 24 21:17:44 2020 -0700 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 2 + .../apache/cassandra/repair/RepairRunnable.java | 25 +++- .../cassandra/repair/messages/RepairOption.java | 14 ++- .../apache/cassandra/tools/nodetool/Repair.java | 4 + .../distributed/test/RepairOperationalTest.java | 131 +++++++++++++++++++++ 5 files changed, 168 insertions(+), 8 deletions(-) diff --cc CHANGES.txt index 1db3ebc,189aec4..2e6715f --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,50 -1,15 +1,52 @@@ -3.11.9 +4.0-beta3 * Avoid failing compactions with very large partitions (CASSANDRA-15164) + * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131) + * Avoid invalid state transition exception during incremental repair (CASSANDRA-16067) + * Allow zero padding in timestamp serialization (CASSANDRA-16105) + * Add byte array backed cells (CASSANDRA-15393) + * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801) + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099) + * Add nodetool getfullquerylog (CASSANDRA-15988) + * Fix yaml format and alignment in tpstats (CASSANDRA-11402) + * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084) + * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954) + * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909) + * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861) + * NPE thrown while updating speculative execution time if keyspace is removed during task execution (CASSANDRA-15949) + * Show the progress of data streaming and index build (CASSANDRA-15406) +Merged from 3.11: + * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935) * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103) + Merged from 3.0: + * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160) -3.11.8 +4.0-beta2 + * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939) + * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876) + * Remove deprecated HintedHandOffManager (CASSANDRA-15939) + * Prevent repair from overrunning compaction (CASSANDRA-15817) + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053) + * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802) + * Fix unicode chars error input (CASSANDRA-15990) + * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788) + * Handle errors in StreamSession#prepare (CASSANDRA-15852) + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039) + * Remove COMPACT STORAGE internals (CASSANDRA-13994) + * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976) + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425) + * Improve logging for socket connection/disconnection (CASSANDRA-15980) + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928) + * Forbid altering UDTs used in partition keys (CASSANDRA-15933) + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973) + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766) + * Verify sstable components on startup (CASSANDRA-15945) + * Resolve JMX output inconsistencies from CASSANDRA-7544 storage-port-configurable-per-node (CASSANDRA-15937) +Merged from 3.11: * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071) * Fix short read protection for GROUP BY queries (CASSANDRA-15459) + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) Merged from 3.0: - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935) * Fix gossip shutdown order (CASSANDRA-15816) * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432) * Check for endpoint collision with hibernating nodes (CASSANDRA-14599) diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java index e5e8e50,7a9590b..f6aa6d1 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@@ -50,18 -33,9 +50,14 @@@ import org.apache.commons.lang3.time.Du import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; +import com.codahale.metrics.Timer; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.RepairMetrics; - import org.apache.cassandra.db.SnapshotCommand; +import org.apache.cassandra.gms.FailureDetector; - import org.apache.cassandra.net.Message; - import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.net.Verb; +import org.apache.cassandra.repair.consistent.SyncStatSummary; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@@ -82,8 -49,6 +78,7 @@@ import org.apache.cassandra.service.Act import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; - import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; @@@ -307,104 -137,181 +302,122 @@@ public class RepairRunnable implements String message = String.format("Starting repair command #%d (%s), repairing keyspace %s with %s", cmd, parentSession, keyspace, options); logger.info(message); - if (options.isTraced()) - { - StringBuilder cfsb = new StringBuilder(); - for (ColumnFamilyStore cfs : validColumnFamilies) - cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name); - - UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR); - traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", - cfsb.substring(2))); - message = message + " tracing with " + sessionId; - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message)); - Tracing.traceRepair(message); - traceState.enableActivityNotification(tag); - for (ProgressListener listener : listeners) - traceState.addProgressListener(listener); - Thread queryThread = createQueryThread(cmd, sessionId); - queryThread.setName("RepairTracePolling"); - queryThread.start(); - } - else - { - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message)); - traceState = null; - } + Tracing.traceRepair(message); + fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message)); + } - final Set<InetAddress> allNeighbors = new HashSet<>(); - List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>(); + private NeighborsAndRanges getNeighborsAndRanges() + { + Set<InetAddressAndPort> allNeighbors = new HashSet<>(); + List<CommonRange> commonRanges = new ArrayList<>(); - //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent + //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent //calculation multiple times - Collection<Range<Token>> keyspaceLocalRanges = storageService.getLocalRanges(keyspace); + Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges(); - try + for (Range<Token> range : options.getRanges()) { - for (Range<Token> range : options.getRanges()) + EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, + options.getDataCenters(), + options.getHosts()); - ++ if (neighbors.isEmpty()) + { - Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, - options.getDataCenters(), - options.getHosts()); - - if (neighbors.isEmpty()) ++ if (options.ignoreUnreplicatedKeyspaces()) + { - if (options.ignoreUnreplicatedKeyspaces()) - { - logger.info("Found no neighbors for range {} for {} - ignoring since repairing with --ignore-unreplicated-keyspaces", range, keyspace); - continue; - } - else - { - String errorMessage = String.format("Nothing to repair for %s in %s - aborting", range, keyspace); - logger.error("Repair {}", errorMessage); - fireErrorAndComplete(tag, progress.get(), totalProgress, errorMessage); - return; - } ++ logger.info("{} Found no neighbors for range {} for {} - ignoring since repairing with --ignore-unreplicated-keyspaces", parentSession, range, keyspace); ++ continue; ++ } ++ else ++ { ++ throw new RuntimeException(String.format("Nothing to repair for %s in %s - aborting", range, keyspace)); + } - addRangeToNeighbors(commonRanges, range, neighbors); - allNeighbors.addAll(neighbors); + } - - progress.incrementAndGet(); + addRangeToNeighbors(commonRanges, range, neighbors); + allNeighbors.addAll(neighbors.endpoints()); } - catch (IllegalArgumentException e) + ++ if (options.ignoreUnreplicatedKeyspaces() && allNeighbors.isEmpty()) + { - logger.error("Repair failed:", e); - fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); - return; ++ throw new SkipRepairException(String.format("Nothing to repair for %s in %s - unreplicated keyspace is ignored since repair was called with --ignore-unreplicated-keyspaces", ++ options.getRanges(), ++ keyspace)); + } + - if (options.ignoreUnreplicatedKeyspaces() && allNeighbors.isEmpty()) + progressCounter.incrementAndGet(); + + boolean force = options.isForcedRepair(); + + if (force && options.isIncremental()) { - String ignoreUnreplicatedMessage = String.format("Nothing to repair for %s in %s - unreplicated keyspace is ignored since repair was called with --ignore-unreplicated-keyspaces", - options.getRanges(), - keyspace); + Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive)); + force = !allNeighbors.equals(actualNeighbors); + allNeighbors = actualNeighbors; + } + return new NeighborsAndRanges(force, allNeighbors, commonRanges); + } - logger.info("Repair {}", ignoreUnreplicatedMessage); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, - progress.get(), - totalProgress, - ignoreUnreplicatedMessage)); - return; + private void maybeStoreParentRepairStart(String[] cfnames) + { + if (!options.isPreview()) + { + SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options); } + } - // Validate columnfamilies - List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(); - try + private void maybeStoreParentRepairSuccess(Collection<Range<Token>> successfulRanges) + { + if (!options.isPreview()) { - Iterables.addAll(columnFamilyStores, validColumnFamilies); - progress.incrementAndGet(); + SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); } - catch (IllegalArgumentException e) + } + + private void maybeStoreParentRepairFailure(Throwable error) + { + if (!options.isPreview()) { - fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); - return; + SystemDistributedKeyspace.failParentRepair(parentSession, error); } + } - String[] cfnames = new String[columnFamilyStores.size()]; - for (int i = 0; i < columnFamilyStores.size(); i++) + private void prepare(List<ColumnFamilyStore> columnFamilies, Set<InetAddressAndPort> allNeighbors, boolean force) + { + try (Timer.Context ignore = Keyspace.open(keyspace).metric.repairPrepareTime.time()) { - cfnames[i] = columnFamilyStores.get(i).name; + ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, force, columnFamilies); + progressCounter.incrementAndGet(); } + } - SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options); - long repairedAt; - try + private void repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges) + { + if (options.isPreview()) { - ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores); - repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt(); - progress.incrementAndGet(); + previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.commonRanges, cfnames); } - catch (Throwable t) + else if (options.isIncremental()) { - SystemDistributedKeyspace.failParentRepair(parentSession, t); - fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage()); - return; + incrementalRepair(parentSession, creationTimeMillis, neighborsAndRanges.force, traceState, + neighborsAndRanges.allNeighbors, neighborsAndRanges.commonRanges, cfnames); } + else + { + normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.commonRanges, cfnames); + } + } - // Set up RepairJob executor for this repair command. - final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), - Integer.MAX_VALUE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("Repair#" + cmd), - "internal")); + private void normalRepair(UUID parentSession, + long startTime, + TraceState traceState, + List<CommonRange> commonRanges, + String... cfnames) + { - List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); - for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges) - { - final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, - p.right, - keyspace, - options.getParallelism(), - p.left, - repairedAt, - options.isPullRepair(), - executor, - cfnames); - if (session == null) - continue; - // After repair session completes, notify client its result - Futures.addCallback(session, new FutureCallback<RepairSessionResult>() - { - public void onSuccess(RepairSessionResult result) - { - /** - * If the success message below is modified, it must also be updated on - * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} - * for backward-compatibility support. - */ - String message = String.format("Repair session %s for range %s finished", session.getId(), - session.getRanges().toString()); - logger.info(message); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, - progress.incrementAndGet(), - totalProgress, - message)); - } + // Set up RepairJob executor for this repair command. + ListeningExecutorService executor = createExecutor(); - public void onFailure(Throwable t) - { - /** - * If the failure message below is modified, it must also be updated on - * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} - * for backward-compatibility support. - */ - String message = String.format("Repair session %s for range %s failed with error %s", - session.getId(), session.getRanges().toString(), t.getMessage()); - logger.error(message, t); - fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, - progress.incrementAndGet(), - totalProgress, - message)); - } - }); - futures.add(session); - } + // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the repairedAt times to be preserved across streamed sstables + final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames); // After all repair sessions completes(successful or not), // run anticompaction if necessary and send finish notice back to client diff --cc src/java/org/apache/cassandra/repair/messages/RepairOption.java index f7ed052,14fff97..a76e6c0 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@@ -48,9 -47,7 +48,10 @@@ public class RepairOptio public static final String TRACE_KEY = "trace"; public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair"; public static final String PULL_REPAIR_KEY = "pullRepair"; + public static final String FORCE_REPAIR_KEY = "forceRepair"; + public static final String PREVIEW = "previewKind"; + public static final String OPTIMISE_STREAMS_KEY = "optimiseStreams"; + public static final String IGNORE_UNREPLICATED_KS = "ignoreUnreplicatedKeyspaces"; // we don't want to push nodes too much for repair public static final int MAX_JOB_THREADS = 4; @@@ -175,10 -137,9 +176,11 @@@ RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY)); boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY)); boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY)); + PreviewKind previewKind = PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString())); boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY)); + boolean force = Boolean.parseBoolean(options.get(FORCE_REPAIR_KEY)); boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY)); + boolean ignoreUnreplicatedKeyspaces = Boolean.parseBoolean(options.get(IGNORE_UNREPLICATED_KS)); int jobThreads = 1; if (options.containsKey(JOB_THREADS_KEY)) @@@ -189,13 -150,34 +191,13 @@@ } catch (NumberFormatException ignore) {} } + // ranges - String rangesStr = options.get(RANGES_KEY); - Set<Range<Token>> ranges = new HashSet<>(); - if (rangesStr != null) - { - if (incremental) - logger.warn("Incremental repair can't be requested with subrange repair " + - "because each subrange repair would generate an anti-compacted table. " + - "The repair will occur but without anti-compaction."); - StringTokenizer tokenizer = new StringTokenizer(rangesStr, ","); - while (tokenizer.hasMoreTokens()) - { - String[] rangeStr = tokenizer.nextToken().split(":", 2); - if (rangeStr.length < 2) - { - continue; - } - Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim()); - Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim()); - if (parsedBeginToken.equals(parsedEndToken)) - { - throw new IllegalArgumentException("Start and end tokens must be different."); - } - ranges.add(new Range<>(parsedBeginToken, parsedEndToken)); - } - } + Set<Range<Token>> ranges = parseRanges(options.get(RANGES_KEY), partitioner); + + boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY)); - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing); - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, ignoreUnreplicatedKeyspaces); ++ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@@ -271,16 -253,14 +273,17 @@@ private final int jobThreads; private final boolean isSubrangeRepair; private final boolean pullRepair; + private final boolean forceRepair; + private final PreviewKind previewKind; + private final boolean optimiseStreams; + private final boolean ignoreUnreplicatedKeyspaces; private final Collection<String> columnFamilies = new HashSet<>(); private final Collection<String> dataCenters = new HashSet<>(); private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams) - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean ignoreUnreplicatedKeyspaces) ++ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces) { if (FBUtilities.isWindows && (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) && @@@ -299,9 -279,7 +302,10 @@@ this.ranges.addAll(ranges); this.isSubrangeRepair = isSubrangeRepair; this.pullRepair = pullRepair; + this.forceRepair = forceRepair; + this.previewKind = previewKind; + this.optimiseStreams = optimiseStreams; + this.ignoreUnreplicatedKeyspaces = ignoreUnreplicatedKeyspaces; } public RepairParallelism getParallelism() @@@ -384,28 -346,25 +388,34 @@@ return dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()); } + public boolean optimiseStreams() + { + return optimiseStreams; + } + + public boolean ignoreUnreplicatedKeyspaces() + { + return ignoreUnreplicatedKeyspaces; + } ++ @Override public String toString() { return "repair options (" + - "parallelism: " + parallelism + - ", primary range: " + primaryRange + - ", incremental: " + incremental + - ", job threads: " + jobThreads + - ", ColumnFamilies: " + columnFamilies + - ", dataCenters: " + dataCenters + - ", hosts: " + hosts + - ", # of ranges: " + ranges.size() + - ", pull repair: " + pullRepair + - ", ignore unreplicated keyspaces: "+ ignoreUnreplicatedKeyspaces + - ')'; + "parallelism: " + parallelism + + ", primary range: " + primaryRange + + ", incremental: " + incremental + + ", job threads: " + jobThreads + + ", ColumnFamilies: " + columnFamilies + + ", dataCenters: " + dataCenters + + ", hosts: " + hosts + + ", previewKind: " + previewKind + + ", # of ranges: " + ranges.size() + + ", pull repair: " + pullRepair + + ", force repair: " + forceRepair + + ", optimise streams: "+ optimiseStreams + ++ ", ignore unreplicated keyspaces: "+ ignoreUnreplicatedKeyspaces + + ')'; } public Map<String, String> asMap() diff --cc src/java/org/apache/cassandra/tools/nodetool/Repair.java index 990d241,180748d..0f9bfb3 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@@ -94,30 -84,9 +94,32 @@@ public class Repair extends NodeToolCm @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.") private boolean pullRepair = false; + @Option(title = "optimise_streams", name = {"-os", "--optimise-streams"}, description = "Use --optimise-streams to try to reduce the number of streams we do (EXPERIMENTAL, see CASSANDRA-3200).") + private boolean optimiseStreams = false; + + @Option(title = "ignore_unreplicated_keyspaces", name = {"-iuk","--ignore-unreplicated-keyspaces"}, description = "Use --ignore-unreplicated-keyspaces to ignore keyspaces which are not replicated, otherwise the repair will fail") + private boolean ignoreUnreplicatedKeyspaces = false; + private PreviewKind getPreviewKind() + { + if (validate) + { + return PreviewKind.REPAIRED; + } + else if (preview && fullRepair) + { + return PreviewKind.ALL; + } + else if (preview) + { + return PreviewKind.UNREPAIRED; + } + else + { + return PreviewKind.NONE; + } + } + @Override public void execute(NodeProbe probe) { @@@ -146,9 -115,8 +148,11 @@@ options.put(RepairOption.TRACE_KEY, Boolean.toString(trace)); options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ",")); options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair)); + options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force)); + options.put(RepairOption.PREVIEW, getPreviewKind().toString()); + options.put(RepairOption.OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams)); + options.put(RepairOption.IGNORE_UNREPLICATED_KS, Boolean.toString(ignoreUnreplicatedKeyspaces)); + if (!startToken.isEmpty() || !endToken.isEmpty()) { options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken); diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java index 499eae3,2f85227..a4628db --- a/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairOperationalTest.java @@@ -22,62 -22,142 +22,193 @@@ import java.io.IOException import org.junit.Test; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.IInvokableInstance; ++import org.assertj.core.api.Assertions; +import static net.bytebuddy.matcher.ElementMatchers.named; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; -import static org.junit.Assert.assertEquals; public class RepairOperationalTest extends TestBaseImpl { @Test + public void compactionBehindTest() throws IOException + { + try(Cluster cluster = init(Cluster.build(2) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .withInstanceInitializer(ByteBuddyHelper::install) + .start())) + { + cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().success()); + cluster.get(2).runOnInstance(() -> { + ByteBuddyHelper.pendingCompactions = 1000; + DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(500); + }); + // make sure repair gets rejected on both nodes if pendingCompactions > threshold: + cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().failure()); + cluster.get(2).runOnInstance(() -> ByteBuddyHelper.pendingCompactions = 499); + cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().success()); + } + } + + public static class ByteBuddyHelper + { + public static volatile int pendingCompactions = 0; ++ + static void install(ClassLoader cl, int nodeNumber) + { + if (nodeNumber == 2) + { + new ByteBuddy().redefine(CompactionManager.class) + .method(named("getPendingTasks")) + .intercept(MethodDelegation.to(ByteBuddyHelper.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + public static int getPendingTasks() + { + return pendingCompactions; + } + } ++ + public void repairUnreplicatedKStest() throws IOException + { + try (Cluster cluster = init(Cluster.build(4) + .withDCs(2) + .withConfig(config -> config.with(GOSSIP).with(NETWORK)) + .start())) + { + cluster.schemaChange("alter keyspace "+KEYSPACE+" with replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 'datacenter2':0}"); + cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, i int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, i) values (?, ?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + + cluster.get(3).nodetoolResult("repair", "-full", KEYSPACE , "tbl", "-st", "0", "-et", "1000") + .asserts() + .failure() + .errorContains("Nothing to repair for (0,1000] in distributed_test_keyspace - aborting"); + cluster.get(3).nodetoolResult("repair", "-full", KEYSPACE , "tbl", "-st", "0", "-et", "1000", "--ignore-unreplicated-keyspaces") + .asserts() + .success() + .notificationContains("unreplicated keyspace is ignored since repair was called with --ignore-unreplicated-keyspaces"); + + } + } + + @Test + public void dcFilterOnEmptyDC() throws IOException + { + try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start()) + { + // 1-2 : datacenter1 + // 3-4 : datacenter2 + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 'datacenter2':0}"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int PRIMARY KEY, i int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + + // choose a node in the DC that doesn't have any replicas + IInvokableInstance node = cluster.get(3); - assertEquals("datacenter2", node.config().localDatacenter()); ++ Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter2"); + // fails with "the local data center must be part of the repair" + node.nodetoolResult("repair", "-full", + "-dc", "datacenter1", "-dc", "datacenter2", + "--ignore-unreplicated-keyspaces", + "-st", "0", "-et", "1000", + KEYSPACE, "tbl") + .asserts().success(); + } + } + + @Test + public void hostFilterDifferentDC() throws IOException + { + try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start()) + { + // 1-2 : datacenter1 + // 3-4 : datacenter2 + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 'datacenter2':0}"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int PRIMARY KEY, i int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + + // choose a node in the DC that doesn't have any replicas + IInvokableInstance node = cluster.get(3); - assertEquals("datacenter2", node.config().localDatacenter()); ++ Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter2"); + // fails with "Specified hosts [127.0.0.3, 127.0.0.1] do not share range (0,1000] needed for repair. Either restrict repair ranges with -st/-et options, or specify one of the neighbors that share this range with this node: [].. Check the logs on the repair participants for further details" + node.nodetoolResult("repair", "-full", + "-hosts", cluster.get(1).broadcastAddress().getAddress().getHostAddress(), + "-hosts", node.broadcastAddress().getAddress().getHostAddress(), + "--ignore-unreplicated-keyspaces", + "-st", "0", "-et", "1000", + KEYSPACE, "tbl") + .asserts().success(); + } + } + + @Test + public void emptyDC() throws IOException + { + try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start()) + { + // 1-2 : datacenter1 + // 3-4 : datacenter2 + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 'datacenter2':0}"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int PRIMARY KEY, i int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + + // choose a node in the DC that doesn't have any replicas + IInvokableInstance node = cluster.get(3); - assertEquals("datacenter2", node.config().localDatacenter()); ++ Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter2"); + // fails with [2020-09-10 11:30:04,139] Repair command #1 failed with error Nothing to repair for (0,1000] in distributed_test_keyspace - aborting. Check the logs on the repair participants for further details + node.nodetoolResult("repair", "-full", + "--ignore-unreplicated-keyspaces", + "-st", "0", "-et", "1000", + KEYSPACE, "tbl") + .asserts().success(); + } + } + + @Test + public void mainDC() throws IOException + { + try (Cluster cluster = Cluster.build().withRacks(2, 1, 2).start()) + { + // 1-2 : datacenter1 + // 3-4 : datacenter2 + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1':2, 'datacenter2':0}"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (id int PRIMARY KEY, i int)"); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, i) VALUES (?, ?)", ConsistencyLevel.ALL, i, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + + // choose a node in the DC that doesn't have any replicas + IInvokableInstance node = cluster.get(1); - assertEquals("datacenter1", node.config().localDatacenter()); ++ Assertions.assertThat(node.config().localDatacenter()).isEqualTo("datacenter1"); + node.nodetoolResult("repair", "-full", + "--ignore-unreplicated-keyspaces", + "-st", "0", "-et", "1000", + KEYSPACE, "tbl") + .asserts().success(); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org