This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new b6eb5890da Optionally prevent tombstone purging during repair b6eb5890da is described below commit b6eb5890da38642fc7af7d39c83f7ec01f33d78f Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Mon Nov 18 11:19:04 2024 -0700 Optionally prevent tombstone purging during repair patch by Marcus Eriksson, Abe Ratnofsky; reviewed by Jon Meredith for CASSANDRA-20071 --- CHANGES.txt | 1 + .../db/repair/CassandraTableRepairManager.java | 4 +- .../db/repair/CassandraValidationIterator.java | 5 +- .../cassandra/repair/AbstractRepairTask.java | 1 + .../org/apache/cassandra/repair/RepairJob.java | 2 +- .../cassandra/repair/RepairMessageVerbHandler.java | 4 +- .../org/apache/cassandra/repair/RepairSession.java | 3 + .../cassandra/repair/TableRepairManager.java | 2 +- .../apache/cassandra/repair/ValidationManager.java | 2 +- .../apache/cassandra/repair/ValidationTask.java | 6 +- .../org/apache/cassandra/repair/Validator.java | 14 ++-- .../cassandra/repair/messages/RepairOption.java | 16 ++++- .../repair/messages/ValidationRequest.java | 11 +++- .../cassandra/service/ActiveRepairService.java | 3 +- .../apache/cassandra/tools/nodetool/Repair.java | 4 ++ .../5.1/service.ValidationRequest.bin | Bin 74 -> 75 bytes .../test/repair/NoTombstonePurgingTest.java | 72 +++++++++++++++++++++ .../simulator/cluster/OnInstanceRepair.java | 2 +- ...pactionManagerGetSSTablesForValidationTest.java | 6 +- .../org/apache/cassandra/repair/RepairJobTest.java | 6 +- .../RepairMessageVerbHandlerOutOfRangeTest.java | 2 +- .../apache/cassandra/repair/RepairSessionTest.java | 3 +- .../cassandra/repair/ValidationTaskTest.java | 2 +- .../org/apache/cassandra/repair/ValidatorTest.java | 6 +- .../messages/RepairMessageSerializationsTest.java | 6 +- .../cassandra/service/SerializationsTest.java | 3 +- 26 files changed, 150 insertions(+), 36 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b5ba4dc267..91d4861942 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Optionally prevent tombstone purging during repair (CASSANDRA-20071) * Add post-filtering support for the IN operator in SAI queries (CASSANDRA-20025) * Don’t finish ongoing decommission and move operations during startup (CASSANDRA-20040) * Nodetool reconfigure cms has correct return code when streaming fails (CASSANDRA-19972) diff --git a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java index 4e54d6ee77..24e79d2454 100644 --- a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java +++ b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java @@ -56,9 +56,9 @@ public class CassandraTableRepairManager implements TableRepairManager } @Override - public ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long nowInSec, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException + public ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long nowInSec, boolean dontPurgeTombstones, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException { - return new CassandraValidationIterator(cfs, ctx, ranges, parentId, sessionID, isIncremental, nowInSec, topPartitionCollector); + return new CassandraValidationIterator(cfs, ctx, ranges, parentId, sessionID, isIncremental, nowInSec, dontPurgeTombstones, topPartitionCollector); } @Override diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java index a31a7038b0..05408e9175 100644 --- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java +++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java @@ -174,7 +174,7 @@ public class CassandraValidationIterator extends ValidationPartitionIterator private final long estimatedPartitions; private final Map<Range<Token>, Long> rangePartitionCounts; - public CassandraValidationIterator(ColumnFamilyStore cfs, SharedContext ctx, Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long nowInSec, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException + public CassandraValidationIterator(ColumnFamilyStore cfs, SharedContext ctx, Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long nowInSec, boolean dontPurgeTombstones, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException { this.cfs = cfs; this.ctx = ctx; @@ -219,7 +219,8 @@ public class CassandraValidationIterator extends ValidationPartitionIterator cfs.getKeyspaceName(), cfs.getTableName()); - controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, nowInSec)); + long gcBefore = dontPurgeTombstones ? Long.MIN_VALUE : getDefaultGcBefore(cfs, nowInSec); + controller = new ValidationCompactionController(cfs, gcBefore); scanners = cfs.getCompactionStrategyManager().getScanners(sstables, ranges); ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.active, topPartitionCollector); diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java index 94cc3545c2..f27e72deb1 100644 --- a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java +++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java @@ -76,6 +76,7 @@ public abstract class AbstractRepairTask implements RepairTask options.optimiseStreams(), options.repairPaxos(), options.paxosOnly(), + options.dontPurgeTombstones(), executor, validationScheduler, cfnames); diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 424b69acd1..63b7b96ec5 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -614,7 +614,7 @@ public class RepairJob extends AsyncFuture<RepairResult> implements Runnable private ValidationTask newValidationTask(InetAddressAndPort endpoint, long nowInSec) { - ValidationTask task = new ValidationTask(session.ctx, desc, endpoint, nowInSec, session.previewKind); + ValidationTask task = new ValidationTask(session.ctx, desc, endpoint, nowInSec, session.previewKind, session.dontPurgeTombstones); validationTasks.add(task); return task; } diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index ca823faa5a..f777126019 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -243,7 +243,9 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> sendAck(message); Validator validator = new Validator(ctx, vState, validationRequest.nowInSec, - isIncremental(desc.parentSessionId), previewKind); + isIncremental(desc.parentSessionId), + previewKind, + validationRequest.dontPurgeTombstones); ctx.validationManager().submitValidation(store, validator); } catch (Throwable t) diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index f329bf4779..92d56390fe 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -121,6 +121,7 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I public final PreviewKind previewKind; public final boolean repairPaxos; public final boolean paxosOnly; + public final boolean dontPurgeTombstones; private final AtomicBoolean isFailed = new AtomicBoolean(false); @@ -161,6 +162,7 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, + boolean dontPurgeTombstones, String... cfnames) { this.ctx = ctx; @@ -174,6 +176,7 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I this.previewKind = previewKind; this.pullRepair = pullRepair; this.optimiseStreams = optimiseStreams; + this.dontPurgeTombstones = dontPurgeTombstones; this.taskExecutor = new SafeExecutor(createExecutor(ctx)); } diff --git a/src/java/org/apache/cassandra/repair/TableRepairManager.java b/src/java/org/apache/cassandra/repair/TableRepairManager.java index 99ccff0714..f5bee38a0c 100644 --- a/src/java/org/apache/cassandra/repair/TableRepairManager.java +++ b/src/java/org/apache/cassandra/repair/TableRepairManager.java @@ -38,7 +38,7 @@ public interface TableRepairManager * data previously isolated for repair with the given parentId. nowInSec should determine whether tombstones should * be purged or not. */ - ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long nowInSec, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException; + ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long nowInSec, boolean dontPurgeTombstones, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException; /** * Begin execution of the given validation callable. Which thread pool a validation should run in is an implementation detail. diff --git a/src/java/org/apache/cassandra/repair/ValidationManager.java b/src/java/org/apache/cassandra/repair/ValidationManager.java index e3598cd38f..ca7ad3a68e 100644 --- a/src/java/org/apache/cassandra/repair/ValidationManager.java +++ b/src/java/org/apache/cassandra/repair/ValidationManager.java @@ -90,7 +90,7 @@ public class ValidationManager implements IValidationManager private static ValidationPartitionIterator getValidationIterator(TableRepairManager repairManager, Validator validator, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException { RepairJobDesc desc = validator.desc; - return repairManager.getValidationIterator(desc.ranges, desc.parentSessionId, desc.sessionId, validator.isIncremental, validator.nowInSec, topPartitionCollector); + return repairManager.getValidationIterator(desc.ranges, desc.parentSessionId, desc.sessionId, validator.isIncremental, validator.nowInSec, validator.dontPurgeTombstones, topPartitionCollector); } /** diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java index 322e07cd2d..445e3880a2 100644 --- a/src/java/org/apache/cassandra/repair/ValidationTask.java +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@ -40,15 +40,17 @@ public class ValidationTask extends AsyncFuture<TreeResponse> implements Runnabl private final InetAddressAndPort endpoint; private final long nowInSec; private final PreviewKind previewKind; + private final boolean dontPurgeTombstones; private final SharedContext ctx; - public ValidationTask(SharedContext ctx, RepairJobDesc desc, InetAddressAndPort endpoint, long nowInSec, PreviewKind previewKind) + public ValidationTask(SharedContext ctx, RepairJobDesc desc, InetAddressAndPort endpoint, long nowInSec, PreviewKind previewKind, boolean dontPurgeTombstones) { this.ctx = ctx; this.desc = desc; this.endpoint = endpoint; this.nowInSec = nowInSec; this.previewKind = previewKind; + this.dontPurgeTombstones = dontPurgeTombstones; } /** @@ -57,7 +59,7 @@ public class ValidationTask extends AsyncFuture<TreeResponse> implements Runnabl public void run() { RepairMessage.sendMessageWithFailureCB(ctx, notDone(this), - new ValidationRequest(desc, nowInSec), + new ValidationRequest(desc, nowInSec, dontPurgeTombstones), VALIDATION_REQ, endpoint, this::tryFailure); diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index d8ba929de6..c5152aee13 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -82,23 +82,24 @@ public class Validator implements Runnable private final PreviewKind previewKind; public final ValidationState state; public TopPartitionTracker.Collector topPartitionCollector; + public final boolean dontPurgeTombstones; public Validator(ValidationState state, long nowInSec, PreviewKind previewKind) { - this(SharedContext.Global.instance, state, nowInSec, false, false, previewKind); + this(SharedContext.Global.instance, state, nowInSec, false, false, previewKind, false); } - public Validator(SharedContext ctx, ValidationState state, long nowInSec, boolean isIncremental, PreviewKind previewKind) + public Validator(SharedContext ctx, ValidationState state, long nowInSec, boolean isIncremental, PreviewKind previewKind, boolean dontPurgeTombstones) { - this(ctx, state, nowInSec, false, isIncremental, previewKind); + this(ctx, state, nowInSec, false, isIncremental, previewKind, dontPurgeTombstones); } - public Validator(ValidationState state, long nowInSec, boolean isIncremental, PreviewKind previewKind) + public Validator(ValidationState state, long nowInSec, boolean isIncremental, PreviewKind previewKind, boolean dontPurgeTombstones) { - this(SharedContext.Global.instance, state, nowInSec, false, isIncremental, previewKind); + this(SharedContext.Global.instance, state, nowInSec, false, isIncremental, previewKind, dontPurgeTombstones); } - public Validator(SharedContext ctx, ValidationState state, long nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind) + public Validator(SharedContext ctx, ValidationState state, long nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind, boolean dontPurgeTombstones) { this.ctx = ctx; this.state = state; @@ -107,6 +108,7 @@ public class Validator implements Runnable this.nowInSec = nowInSec; this.isIncremental = isIncremental; this.previewKind = previewKind; + this.dontPurgeTombstones = dontPurgeTombstones; validated = 0; range = null; ranges = null; diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index 03097da077..bc9231dcc1 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -54,6 +54,8 @@ public class RepairOption public static final String IGNORE_UNREPLICATED_KS = "ignoreUnreplicatedKeyspaces"; public static final String REPAIR_PAXOS_KEY = "repairPaxos"; public static final String PAXOS_ONLY_KEY = "paxosOnly"; + public static final String NO_TOMBSTONE_PURGING = "nopurge"; + // we don't want to push nodes too much for repair public static final int MAX_JOB_THREADS = 4; @@ -185,6 +187,7 @@ public class RepairOption boolean ignoreUnreplicatedKeyspaces = Boolean.parseBoolean(options.get(IGNORE_UNREPLICATED_KS)); boolean repairPaxos = Boolean.parseBoolean(options.get(REPAIR_PAXOS_KEY)); boolean paxosOnly = Boolean.parseBoolean(options.get(PAXOS_ONLY_KEY)); + boolean dontPurgeTombstones = Boolean.parseBoolean(options.get(NO_TOMBSTONE_PURGING)); if (previewKind != PreviewKind.NONE) { @@ -209,7 +212,7 @@ public class RepairOption boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY)); - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly); + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, dontPurgeTombstones); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@ -291,13 +294,14 @@ public class RepairOption private final boolean ignoreUnreplicatedKeyspaces; private final boolean repairPaxos; private final boolean paxosOnly; + private final boolean dontPurgeTombstones; private final Collection<String> columnFamilies = new HashSet<>(); private final Collection<String> dataCenters = new HashSet<>(); private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly) + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly, boolean dontPurgeTombstones) { this.parallelism = parallelism; @@ -314,6 +318,7 @@ public class RepairOption this.ignoreUnreplicatedKeyspaces = ignoreUnreplicatedKeyspaces; this.repairPaxos = repairPaxos; this.paxosOnly = paxosOnly; + this.dontPurgeTombstones = dontPurgeTombstones; } public RepairParallelism getParallelism() @@ -429,6 +434,11 @@ public class RepairOption return paxosOnly; } + public boolean dontPurgeTombstones() + { + return dontPurgeTombstones; + } + @Override public String toString() { @@ -448,6 +458,7 @@ public class RepairOption ", ignore unreplicated keyspaces: "+ ignoreUnreplicatedKeyspaces + ", repairPaxos: " + repairPaxos + ", paxosOnly: " + paxosOnly + + ", dontPurgeTombstones: " + dontPurgeTombstones + ')'; } @@ -470,6 +481,7 @@ public class RepairOption options.put(OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams)); options.put(REPAIR_PAXOS_KEY, Boolean.toString(repairPaxos)); options.put(PAXOS_ONLY_KEY, Boolean.toString(paxosOnly)); + options.put(NO_TOMBSTONE_PURGING, Boolean.toString(dontPurgeTombstones)); return options; } } diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java index 1e651a96d2..5c6550fac7 100644 --- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java @@ -35,11 +35,13 @@ import org.apache.cassandra.utils.CassandraUInt; public class ValidationRequest extends RepairMessage { public final long nowInSec; + public final boolean dontPurgeTombstones; - public ValidationRequest(RepairJobDesc desc, long nowInSec) + public ValidationRequest(RepairJobDesc desc, long nowInSec, boolean dontPurgeTombstones) { super(desc); this.nowInSec = nowInSec; + this.dontPurgeTombstones = dontPurgeTombstones; } @Override @@ -47,6 +49,7 @@ public class ValidationRequest extends RepairMessage { return "ValidationRequest{" + "nowInSec=" + nowInSec + + ", dontPurgeTombstones" + dontPurgeTombstones + "} " + super.toString(); } @@ -72,19 +75,23 @@ public class ValidationRequest extends RepairMessage { RepairJobDesc.serializer.serialize(message.desc, out, version); out.writeInt(version >= MessagingService.VERSION_50 ? CassandraUInt.fromLong(message.nowInSec) : (int) message.nowInSec); + if (version >= MessagingService.VERSION_51) + out.writeBoolean(message.dontPurgeTombstones); } public ValidationRequest deserialize(DataInputPlus dis, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(dis, version); long nowInsec = version >= MessagingService.VERSION_50 ? CassandraUInt.toLong(dis.readInt()) : dis.readInt(); - return new ValidationRequest(desc, nowInsec); + boolean dontPurgeTombstones = version >= MessagingService.VERSION_51 ? dis.readBoolean() : false; + return new ValidationRequest(desc, nowInsec, dontPurgeTombstones); } public long serializedSize(ValidationRequest message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); size += TypeSizes.INT_SIZE; + size += version >= MessagingService.VERSION_51 ? TypeSizes.sizeof(message.dontPurgeTombstones) : 0; return size; } }; diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 9682c8e959..b5fbc48d43 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -454,6 +454,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, + boolean dontPurgeTombstones, ExecutorPlus executor, Scheduler validationScheduler, String... cfnames) @@ -469,7 +470,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai final RepairSession session = new RepairSession(ctx, validationScheduler, parentRepairSession, range, keyspace, parallelismDegree, isIncremental, pullRepair, - previewKind, optimiseStreams, repairPaxos, paxosOnly, cfnames); + previewKind, optimiseStreams, repairPaxos, paxosOnly, dontPurgeTombstones, cfnames); repairs.getIfPresent(parentRepairSession).register(session.state); sessions.put(session.getId(), session); diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java index 3583240830..c66992acc9 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@ -108,6 +108,9 @@ public class Repair extends NodeToolCmd @Option(title = "ignore_unreplicated_keyspaces", name = {"-iuk","--ignore-unreplicated-keyspaces"}, description = "Use --ignore-unreplicated-keyspaces to ignore keyspaces which are not replicated, otherwise the repair will fail") private boolean ignoreUnreplicatedKeyspaces = false; + @Option(title = "no_purge", name = {"--include-gcgs-expired-tombstones"}, description = "Do not apply gc grace seconds to purge any tombstones. Only useful in rare recovery scenarios, never regular operations.") + private boolean dontPurgeTombstones = false; + private PreviewKind getPreviewKind() { if (validate) @@ -186,6 +189,7 @@ public class Repair extends NodeToolCmd options.put(RepairOption.IGNORE_UNREPLICATED_KS, Boolean.toString(ignoreUnreplicatedKeyspaces)); options.put(RepairOption.REPAIR_PAXOS_KEY, Boolean.toString(!skipPaxos && getPreviewKind() == PreviewKind.NONE)); options.put(RepairOption.PAXOS_ONLY_KEY, Boolean.toString(paxosOnly && getPreviewKind() == PreviewKind.NONE)); + options.put(RepairOption.NO_TOMBSTONE_PURGING, Boolean.toString(dontPurgeTombstones)); if (!startToken.isEmpty() || !endToken.isEmpty()) { diff --git a/test/data/serialization/5.1/service.ValidationRequest.bin b/test/data/serialization/5.1/service.ValidationRequest.bin index 04c492a8a1..2a2aea7183 100644 Binary files a/test/data/serialization/5.1/service.ValidationRequest.bin and b/test/data/serialization/5.1/service.ValidationRequest.bin differ diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/NoTombstonePurgingTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/NoTombstonePurgingTest.java new file mode 100644 index 0000000000..adcb2b3e2c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/NoTombstonePurgingTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class NoTombstonePurgingTest extends TestBaseImpl +{ + @Test + public void testNp() throws IOException + { + testHelper((cluster) -> { + // full repair, with -np, tombstone gets streamed + cluster.get(1).nodetoolResult("repair", "--include-gcgs-expired-tombstones", "--full", KEYSPACE, "tbl"); + }); + } + + private void testHelper(Consumer<Cluster> repair) throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(c -> c.set("hinted_handoff_enabled", false) + .with(Feature.values())) + .start())) + { + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key) with gc_grace_seconds = 1 and compaction={'class':'SizeTieredCompactionStrategy', 'enabled':false}")); + cluster.get(1).executeInternal(withKeyspace("delete from %s.tbl where id = 5")); + cluster.get(1).flush(KEYSPACE); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); //gcgs expiry + + // incremental repair, the tombstone is purgeable, will not get included in MT calculation + cluster.get(1).nodetoolResult("repair", KEYSPACE, "tbl"); + cluster.get(2).runOnInstance(() -> assertTrue(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty())); + + // full repair, tombstone still gets purged + cluster.get(1).nodetoolResult("repair", "--full", KEYSPACE, "tbl"); + cluster.get(2).runOnInstance(() -> assertTrue(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty())); + + repair.accept(cluster); + + cluster.get(2).runOnInstance(() -> assertFalse(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty())); + } + } +} diff --git a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java index f24e639ac3..46edfb3926 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java +++ b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java @@ -97,7 +97,7 @@ class OnInstanceRepair extends ClusterAction { Collection<Range<Token>> ranges = rangesSupplier.call(); // no need to wait for completion, as we track all task submissions and message exchanges, and ensure they finish before continuing to next action - StorageService.instance.repair(keyspaceName, new RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, ranges, false, false, force, PreviewKind.NONE, false, true, repairPaxos, repairOnlyPaxos), singletonList((tag, event) -> { + StorageService.instance.repair(keyspaceName, new RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, ranges, false, false, force, PreviewKind.NONE, false, true, repairPaxos, repairOnlyPaxos, false), singletonList((tag, event) -> { if (event.getType() == ProgressEventType.COMPLETE) listener.run(); })); diff --git a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java index 011db44d09..4c7932888b 100644 --- a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java +++ b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java @@ -143,7 +143,7 @@ public class CompactionManagerGetSSTablesForValidationTest modifySSTables(); // get sstables for repair - Validator validator = new Validator(new ValidationState(Clock.Global.clock(), desc, coordinator), FBUtilities.nowInSeconds(), true, PreviewKind.NONE); + Validator validator = new Validator(new ValidationState(Clock.Global.clock(), desc, coordinator), FBUtilities.nowInSeconds(), true, PreviewKind.NONE, false); Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, SharedContext.Global.instance, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental)); Assert.assertNotNull(sstables); Assert.assertEquals(1, sstables.size()); @@ -158,7 +158,7 @@ public class CompactionManagerGetSSTablesForValidationTest modifySSTables(); // get sstables for repair - Validator validator = new Validator(new ValidationState(Clock.Global.clock(), desc, coordinator), FBUtilities.nowInSeconds(), false, PreviewKind.NONE); + Validator validator = new Validator(new ValidationState(Clock.Global.clock(), desc, coordinator), FBUtilities.nowInSeconds(), false, PreviewKind.NONE, false); Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, SharedContext.Global.instance, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental)); Assert.assertNotNull(sstables); Assert.assertEquals(2, sstables.size()); @@ -174,7 +174,7 @@ public class CompactionManagerGetSSTablesForValidationTest modifySSTables(); // get sstables for repair - Validator validator = new Validator(new ValidationState(Clock.Global.clock(), desc, coordinator), FBUtilities.nowInSeconds(), false, PreviewKind.NONE); + Validator validator = new Validator(new ValidationState(Clock.Global.clock(), desc, coordinator), FBUtilities.nowInSeconds(), false, PreviewKind.NONE, false); Set<SSTableReader> sstables = Sets.newHashSet(getSSTablesToValidate(cfs, SharedContext.Global.instance, validator.desc.ranges, validator.desc.parentSessionId, validator.isIncremental)); Assert.assertNotNull(sstables); Assert.assertEquals(3, sstables.size()); diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java index 36c17855e3..ea32bd750b 100644 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -126,11 +126,11 @@ public class RepairJobTest public MeasureableRepairSession(TimeUUID parentRepairSession, CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, PreviewKind previewKind, boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, - String... cfnames) + boolean dontPurgeTombstones, String... cfnames) { super(SharedContext.Global.instance, new Scheduler.NoopScheduler(), parentRepairSession, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, - previewKind, optimiseStreams, repairPaxos, paxosOnly, cfnames); + previewKind, optimiseStreams, repairPaxos, paxosOnly, dontPurgeTombstones, cfnames); } @Override @@ -196,7 +196,7 @@ public class RepairJobTest this.session = new MeasureableRepairSession(parentRepairSession, new CommonRange(neighbors, emptySet(), FULL_RANGE), KEYSPACE, SEQUENTIAL, false, false, - NONE, false, true, false, CF); + NONE, false, true, false, false, CF); this.job = new RepairJob(session, CF); this.sessionJobDesc = new RepairJobDesc(session.state.parentRepairSession, session.getId(), diff --git a/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java b/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java index 6b74d0febd..4e7f9ba400 100644 --- a/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java @@ -262,7 +262,7 @@ public class RepairMessageVerbHandlerOutOfRangeTest true, PreviewKind.NONE); return new ValidationRequest(new RepairJobDesc(parentId, uuid(), KEYSPACE, TABLE, Collections.singleton(range)), - randomInt()); + randomInt(), false); } public static TimeUUID uuid() diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index f4d177870e..470a2efc53 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -67,7 +67,8 @@ public class RepairSessionTest new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)), "Keyspace1", RepairParallelism.SEQUENTIAL, false, false, - PreviewKind.NONE, false, false, false, "Standard1"); + PreviewKind.NONE, false, false, false, false, + "Standard1"); // perform convict session.convict(remote, Double.MAX_VALUE); diff --git a/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java b/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java index 5639d4105c..e7f325de52 100644 --- a/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java @@ -79,6 +79,6 @@ public class ValidationTaskTest private ValidationTask createTask() throws UnknownHostException { InetAddressAndPort addressAndPort = InetAddressAndPort.getByName("127.0.0.1"); RepairJobDesc desc = new RepairJobDesc(nextTimeUUID(), nextTimeUUID(), UUID.randomUUID().toString(), UUID.randomUUID().toString(), null); - return new ValidationTask(SharedContext.Global.instance, desc, addressAndPort, 0, PreviewKind.NONE); + return new ValidationTask(SharedContext.Global.instance, desc, addressAndPort, 0, PreviewKind.NONE, false); } } diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index 7d0317f2c1..9ea956b85c 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -208,7 +208,7 @@ public class ValidatorTest false, PreviewKind.NONE); final CompletableFuture<Message> outgoingMessageSink = registerOutgoingMessageSink(); - Validator validator = new Validator(SharedContext.Global.instance, new ValidationState(Clock.Global.clock(), desc, host), 0, true, false, PreviewKind.NONE); + Validator validator = new Validator(SharedContext.Global.instance, new ValidationState(Clock.Global.clock(), desc, host), 0, true, false, PreviewKind.NONE, false); ValidationManager.instance.submitValidation(cfs, validator); Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); @@ -265,7 +265,7 @@ public class ValidatorTest false, PreviewKind.NONE); final CompletableFuture<Message> outgoingMessageSink = registerOutgoingMessageSink(); - Validator validator = new Validator(SharedContext.Global.instance, new ValidationState(Clock.Global.clock(), desc, host), 0, true, false, PreviewKind.NONE); + Validator validator = new Validator(SharedContext.Global.instance, new ValidationState(Clock.Global.clock(), desc, host), 0, true, false, PreviewKind.NONE, false); ValidationManager.instance.submitValidation(cfs, validator); Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); @@ -327,7 +327,7 @@ public class ValidatorTest false, PreviewKind.NONE); final CompletableFuture<Message> outgoingMessageSink = registerOutgoingMessageSink(); - Validator validator = new Validator(SharedContext.Global.instance, new ValidationState(Clock.Global.clock(), desc, host), 0, true, false, PreviewKind.NONE); + Validator validator = new Validator(SharedContext.Global.instance, new ValidationState(Clock.Global.clock(), desc, host), 0, true, false, PreviewKind.NONE, false); ValidationManager.instance.submitValidation(cfs, validator); Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java index 9886076d70..1657ceff48 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java @@ -87,9 +87,13 @@ public class RepairMessageSerializationsTest public void validationRequestMessage() throws IOException { RepairJobDesc jobDesc = buildRepairJobDesc(); - ValidationRequest msg = new ValidationRequest(jobDesc, GC_BEFORE); + ValidationRequest msg = new ValidationRequest(jobDesc, GC_BEFORE, false); ValidationRequest deserialized = serializeRoundTrip(msg, ValidationRequest.serializer); Assert.assertEquals(jobDesc, deserialized.desc); + + msg = new ValidationRequest(jobDesc, GC_BEFORE, true); + deserialized = serializeRoundTrip(msg, ValidationRequest.serializer); + Assert.assertEquals(jobDesc, deserialized.desc); } private RepairJobDesc buildRepairJobDesc() diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index d10b65dd17..20431fc335 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -104,7 +104,7 @@ public class SerializationsTest extends AbstractSerializationsTester private void testValidationRequestWrite() throws IOException { - ValidationRequest message = new ValidationRequest(DESC, 1234); + ValidationRequest message = new ValidationRequest(DESC, 1234, true); testRepairMessageWrite("service.ValidationRequest.bin", ValidationRequest.serializer, message); } @@ -119,6 +119,7 @@ public class SerializationsTest extends AbstractSerializationsTester ValidationRequest message = ValidationRequest.serializer.deserialize(in, getVersion()); assert DESC.equals(message.desc); assert message.nowInSec == 1234; + assert message.dontPurgeTombstones == true; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org