This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b6eb5890da Optionally prevent tombstone purging during repair
b6eb5890da is described below

commit b6eb5890da38642fc7af7d39c83f7ec01f33d78f
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Nov 18 11:19:04 2024 -0700

    Optionally prevent tombstone purging during repair
    
    patch by Marcus Eriksson, Abe Ratnofsky; reviewed by Jon Meredith for 
CASSANDRA-20071
---
 CHANGES.txt                                        |   1 +
 .../db/repair/CassandraTableRepairManager.java     |   4 +-
 .../db/repair/CassandraValidationIterator.java     |   5 +-
 .../cassandra/repair/AbstractRepairTask.java       |   1 +
 .../org/apache/cassandra/repair/RepairJob.java     |   2 +-
 .../cassandra/repair/RepairMessageVerbHandler.java |   4 +-
 .../org/apache/cassandra/repair/RepairSession.java |   3 +
 .../cassandra/repair/TableRepairManager.java       |   2 +-
 .../apache/cassandra/repair/ValidationManager.java |   2 +-
 .../apache/cassandra/repair/ValidationTask.java    |   6 +-
 .../org/apache/cassandra/repair/Validator.java     |  14 ++--
 .../cassandra/repair/messages/RepairOption.java    |  16 ++++-
 .../repair/messages/ValidationRequest.java         |  11 +++-
 .../cassandra/service/ActiveRepairService.java     |   3 +-
 .../apache/cassandra/tools/nodetool/Repair.java    |   4 ++
 .../5.1/service.ValidationRequest.bin              | Bin 74 -> 75 bytes
 .../test/repair/NoTombstonePurgingTest.java        |  72 +++++++++++++++++++++
 .../simulator/cluster/OnInstanceRepair.java        |   2 +-
 ...pactionManagerGetSSTablesForValidationTest.java |   6 +-
 .../org/apache/cassandra/repair/RepairJobTest.java |   6 +-
 .../RepairMessageVerbHandlerOutOfRangeTest.java    |   2 +-
 .../apache/cassandra/repair/RepairSessionTest.java |   3 +-
 .../cassandra/repair/ValidationTaskTest.java       |   2 +-
 .../org/apache/cassandra/repair/ValidatorTest.java |   6 +-
 .../messages/RepairMessageSerializationsTest.java  |   6 +-
 .../cassandra/service/SerializationsTest.java      |   3 +-
 26 files changed, 150 insertions(+), 36 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b5ba4dc267..91d4861942 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Optionally prevent tombstone purging during repair (CASSANDRA-20071)
  * Add post-filtering support for the IN operator in SAI queries 
(CASSANDRA-20025)
  * Don’t finish ongoing decommission and move operations during startup 
(CASSANDRA-20040)
  * Nodetool reconfigure cms has correct return code when streaming fails 
(CASSANDRA-19972)
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java 
b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java
index 4e54d6ee77..24e79d2454 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java
@@ -56,9 +56,9 @@ public class CassandraTableRepairManager implements 
TableRepairManager
     }
 
     @Override
-    public ValidationPartitionIterator 
getValidationIterator(Collection<Range<Token>> ranges, TimeUUID parentId, 
TimeUUID sessionID, boolean isIncremental, long nowInSec, 
TopPartitionTracker.Collector topPartitionCollector) throws IOException, 
NoSuchRepairSessionException
+    public ValidationPartitionIterator 
getValidationIterator(Collection<Range<Token>> ranges, TimeUUID parentId, 
TimeUUID sessionID, boolean isIncremental, long nowInSec, boolean 
dontPurgeTombstones, TopPartitionTracker.Collector topPartitionCollector) 
throws IOException, NoSuchRepairSessionException
     {
-        return new CassandraValidationIterator(cfs, ctx, ranges, parentId, 
sessionID, isIncremental, nowInSec, topPartitionCollector);
+        return new CassandraValidationIterator(cfs, ctx, ranges, parentId, 
sessionID, isIncremental, nowInSec, dontPurgeTombstones, topPartitionCollector);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java 
b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index a31a7038b0..05408e9175 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -174,7 +174,7 @@ public class CassandraValidationIterator extends 
ValidationPartitionIterator
     private final long estimatedPartitions;
     private final Map<Range<Token>, Long> rangePartitionCounts;
 
-    public CassandraValidationIterator(ColumnFamilyStore cfs, SharedContext 
ctx, Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, 
boolean isIncremental, long nowInSec, TopPartitionTracker.Collector 
topPartitionCollector) throws IOException, NoSuchRepairSessionException
+    public CassandraValidationIterator(ColumnFamilyStore cfs, SharedContext 
ctx, Collection<Range<Token>> ranges, TimeUUID parentId, TimeUUID sessionID, 
boolean isIncremental, long nowInSec, boolean dontPurgeTombstones, 
TopPartitionTracker.Collector topPartitionCollector) throws IOException, 
NoSuchRepairSessionException
     {
         this.cfs = cfs;
         this.ctx = ctx;
@@ -219,7 +219,8 @@ public class CassandraValidationIterator extends 
ValidationPartitionIterator
                     cfs.getKeyspaceName(),
                     cfs.getTableName());
 
-        controller = new ValidationCompactionController(cfs, 
getDefaultGcBefore(cfs, nowInSec));
+        long gcBefore = dontPurgeTombstones ? Long.MIN_VALUE : 
getDefaultGcBefore(cfs, nowInSec);
+        controller = new ValidationCompactionController(cfs, gcBefore);
         scanners = cfs.getCompactionStrategyManager().getScanners(sstables, 
ranges);
         ci = new ValidationCompactionIterator(scanners.scanners, controller, 
nowInSec, CompactionManager.instance.active, topPartitionCollector);
 
diff --git a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java 
b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
index 94cc3545c2..f27e72deb1 100644
--- a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
@@ -76,6 +76,7 @@ public abstract class AbstractRepairTask implements RepairTask
                                                                                
  options.optimiseStreams(),
                                                                                
  options.repairPaxos(),
                                                                                
  options.paxosOnly(),
+                                                                               
  options.dontPurgeTombstones(),
                                                                                
  executor,
                                                                                
  validationScheduler,
                                                                                
  cfnames);
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 424b69acd1..63b7b96ec5 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -614,7 +614,7 @@ public class RepairJob extends AsyncFuture<RepairResult> 
implements Runnable
 
     private ValidationTask newValidationTask(InetAddressAndPort endpoint, long 
nowInSec)
     {
-        ValidationTask task = new ValidationTask(session.ctx, desc, endpoint, 
nowInSec, session.previewKind);
+        ValidationTask task = new ValidationTask(session.ctx, desc, endpoint, 
nowInSec, session.previewKind, session.dontPurgeTombstones);
         validationTasks.add(task);
         return task;
     }
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index ca823faa5a..f777126019 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -243,7 +243,9 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                         sendAck(message);
 
                         Validator validator = new Validator(ctx, vState, 
validationRequest.nowInSec,
-                                                            
isIncremental(desc.parentSessionId), previewKind);
+                                                            
isIncremental(desc.parentSessionId),
+                                                            previewKind,
+                                                            
validationRequest.dontPurgeTombstones);
                         ctx.validationManager().submitValidation(store, 
validator);
                     }
                     catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index f329bf4779..92d56390fe 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -121,6 +121,7 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
     public final PreviewKind previewKind;
     public final boolean repairPaxos;
     public final boolean paxosOnly;
+    public final boolean dontPurgeTombstones;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
@@ -161,6 +162,7 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
                          boolean optimiseStreams,
                          boolean repairPaxos,
                          boolean paxosOnly,
+                         boolean dontPurgeTombstones,
                          String... cfnames)
     {
         this.ctx = ctx;
@@ -174,6 +176,7 @@ public class RepairSession extends 
AsyncFuture<RepairSessionResult> implements I
         this.previewKind = previewKind;
         this.pullRepair = pullRepair;
         this.optimiseStreams = optimiseStreams;
+        this.dontPurgeTombstones = dontPurgeTombstones;
         this.taskExecutor = new SafeExecutor(createExecutor(ctx));
     }
 
diff --git a/src/java/org/apache/cassandra/repair/TableRepairManager.java 
b/src/java/org/apache/cassandra/repair/TableRepairManager.java
index 99ccff0714..f5bee38a0c 100644
--- a/src/java/org/apache/cassandra/repair/TableRepairManager.java
+++ b/src/java/org/apache/cassandra/repair/TableRepairManager.java
@@ -38,7 +38,7 @@ public interface TableRepairManager
      * data previously isolated for repair with the given parentId. nowInSec 
should determine whether tombstones should
      * be purged or not.
      */
-    ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> 
ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long 
nowInSec, TopPartitionTracker.Collector topPartitionCollector) throws 
IOException, NoSuchRepairSessionException;
+    ValidationPartitionIterator getValidationIterator(Collection<Range<Token>> 
ranges, TimeUUID parentId, TimeUUID sessionID, boolean isIncremental, long 
nowInSec, boolean dontPurgeTombstones, TopPartitionTracker.Collector 
topPartitionCollector) throws IOException, NoSuchRepairSessionException;
 
     /**
      * Begin execution of the given validation callable. Which thread pool a 
validation should run in is an implementation detail.
diff --git a/src/java/org/apache/cassandra/repair/ValidationManager.java 
b/src/java/org/apache/cassandra/repair/ValidationManager.java
index e3598cd38f..ca7ad3a68e 100644
--- a/src/java/org/apache/cassandra/repair/ValidationManager.java
+++ b/src/java/org/apache/cassandra/repair/ValidationManager.java
@@ -90,7 +90,7 @@ public class ValidationManager implements IValidationManager
     private static ValidationPartitionIterator 
getValidationIterator(TableRepairManager repairManager, Validator validator, 
TopPartitionTracker.Collector topPartitionCollector) throws IOException, 
NoSuchRepairSessionException
     {
         RepairJobDesc desc = validator.desc;
-        return repairManager.getValidationIterator(desc.ranges, 
desc.parentSessionId, desc.sessionId, validator.isIncremental, 
validator.nowInSec, topPartitionCollector);
+        return repairManager.getValidationIterator(desc.ranges, 
desc.parentSessionId, desc.sessionId, validator.isIncremental, 
validator.nowInSec, validator.dontPurgeTombstones, topPartitionCollector);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java 
b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 322e07cd2d..445e3880a2 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -40,15 +40,17 @@ public class ValidationTask extends 
AsyncFuture<TreeResponse> implements Runnabl
     private final InetAddressAndPort endpoint;
     private final long nowInSec;
     private final PreviewKind previewKind;
+    private final boolean dontPurgeTombstones;
     private final SharedContext ctx;
 
-    public ValidationTask(SharedContext ctx, RepairJobDesc desc, 
InetAddressAndPort endpoint, long nowInSec, PreviewKind previewKind)
+    public ValidationTask(SharedContext ctx, RepairJobDesc desc, 
InetAddressAndPort endpoint, long nowInSec, PreviewKind previewKind, boolean 
dontPurgeTombstones)
     {
         this.ctx = ctx;
         this.desc = desc;
         this.endpoint = endpoint;
         this.nowInSec = nowInSec;
         this.previewKind = previewKind;
+        this.dontPurgeTombstones = dontPurgeTombstones;
     }
 
     /**
@@ -57,7 +59,7 @@ public class ValidationTask extends AsyncFuture<TreeResponse> 
implements Runnabl
     public void run()
     {
         RepairMessage.sendMessageWithFailureCB(ctx, notDone(this),
-                                               new ValidationRequest(desc, 
nowInSec),
+                                               new ValidationRequest(desc, 
nowInSec, dontPurgeTombstones),
                                                VALIDATION_REQ,
                                                endpoint,
                                                this::tryFailure);
diff --git a/src/java/org/apache/cassandra/repair/Validator.java 
b/src/java/org/apache/cassandra/repair/Validator.java
index d8ba929de6..c5152aee13 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -82,23 +82,24 @@ public class Validator implements Runnable
     private final PreviewKind previewKind;
     public final ValidationState state;
     public TopPartitionTracker.Collector topPartitionCollector;
+    public final boolean dontPurgeTombstones;
 
     public Validator(ValidationState state, long nowInSec, PreviewKind 
previewKind)
     {
-        this(SharedContext.Global.instance, state, nowInSec, false, false, 
previewKind);
+        this(SharedContext.Global.instance, state, nowInSec, false, false, 
previewKind, false);
     }
 
-    public Validator(SharedContext ctx, ValidationState state, long nowInSec, 
boolean isIncremental, PreviewKind previewKind)
+    public Validator(SharedContext ctx, ValidationState state, long nowInSec, 
boolean isIncremental, PreviewKind previewKind, boolean dontPurgeTombstones)
     {
-        this(ctx, state, nowInSec, false, isIncremental, previewKind);
+        this(ctx, state, nowInSec, false, isIncremental, previewKind, 
dontPurgeTombstones);
     }
 
-    public Validator(ValidationState state, long nowInSec, boolean 
isIncremental, PreviewKind previewKind)
+    public Validator(ValidationState state, long nowInSec, boolean 
isIncremental, PreviewKind previewKind, boolean dontPurgeTombstones)
     {
-        this(SharedContext.Global.instance, state, nowInSec, false, 
isIncremental, previewKind);
+        this(SharedContext.Global.instance, state, nowInSec, false, 
isIncremental, previewKind, dontPurgeTombstones);
     }
 
-    public Validator(SharedContext ctx, ValidationState state, long nowInSec, 
boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind)
+    public Validator(SharedContext ctx, ValidationState state, long nowInSec, 
boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind, 
boolean dontPurgeTombstones)
     {
         this.ctx = ctx;
         this.state = state;
@@ -107,6 +108,7 @@ public class Validator implements Runnable
         this.nowInSec = nowInSec;
         this.isIncremental = isIncremental;
         this.previewKind = previewKind;
+        this.dontPurgeTombstones = dontPurgeTombstones;
         validated = 0;
         range = null;
         ranges = null;
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java 
b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 03097da077..bc9231dcc1 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -54,6 +54,8 @@ public class RepairOption
     public static final String IGNORE_UNREPLICATED_KS = 
"ignoreUnreplicatedKeyspaces";
     public static final String REPAIR_PAXOS_KEY = "repairPaxos";
     public static final String PAXOS_ONLY_KEY = "paxosOnly";
+    public static final String NO_TOMBSTONE_PURGING = "nopurge";
+
 
     // we don't want to push nodes too much for repair
     public static final int MAX_JOB_THREADS = 4;
@@ -185,6 +187,7 @@ public class RepairOption
         boolean ignoreUnreplicatedKeyspaces = 
Boolean.parseBoolean(options.get(IGNORE_UNREPLICATED_KS));
         boolean repairPaxos = 
Boolean.parseBoolean(options.get(REPAIR_PAXOS_KEY));
         boolean paxosOnly = Boolean.parseBoolean(options.get(PAXOS_ONLY_KEY));
+        boolean dontPurgeTombstones = 
Boolean.parseBoolean(options.get(NO_TOMBSTONE_PURGING));
 
         if (previewKind != PreviewKind.NONE)
         {
@@ -209,7 +212,7 @@ public class RepairOption
 
         boolean asymmetricSyncing = 
Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));
 
-        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, 
previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, 
paxosOnly);
+        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, 
previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, 
paxosOnly, dontPurgeTombstones);
 
         // data centers
         String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -291,13 +294,14 @@ public class RepairOption
     private final boolean ignoreUnreplicatedKeyspaces;
     private final boolean repairPaxos;
     private final boolean paxosOnly;
+    private final boolean dontPurgeTombstones;
 
     private final Collection<String> columnFamilies = new HashSet<>();
     private final Collection<String> dataCenters = new HashSet<>();
     private final Collection<String> hosts = new HashSet<>();
     private final Collection<Range<Token>> ranges = new HashSet<>();
 
-    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, 
PreviewKind previewKind, boolean optimiseStreams, boolean 
ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly)
+    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, 
PreviewKind previewKind, boolean optimiseStreams, boolean 
ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly, boolean 
dontPurgeTombstones)
     {
 
         this.parallelism = parallelism;
@@ -314,6 +318,7 @@ public class RepairOption
         this.ignoreUnreplicatedKeyspaces = ignoreUnreplicatedKeyspaces;
         this.repairPaxos = repairPaxos;
         this.paxosOnly = paxosOnly;
+        this.dontPurgeTombstones = dontPurgeTombstones;
     }
 
     public RepairParallelism getParallelism()
@@ -429,6 +434,11 @@ public class RepairOption
         return paxosOnly;
     }
 
+    public boolean dontPurgeTombstones()
+    {
+        return dontPurgeTombstones;
+    }
+
     @Override
     public String toString()
     {
@@ -448,6 +458,7 @@ public class RepairOption
                ", ignore unreplicated keyspaces: "+ 
ignoreUnreplicatedKeyspaces +
                ", repairPaxos: " + repairPaxos +
                ", paxosOnly: " + paxosOnly +
+               ", dontPurgeTombstones: " + dontPurgeTombstones +
                ')';
     }
 
@@ -470,6 +481,7 @@ public class RepairOption
         options.put(OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams));
         options.put(REPAIR_PAXOS_KEY, Boolean.toString(repairPaxos));
         options.put(PAXOS_ONLY_KEY, Boolean.toString(paxosOnly));
+        options.put(NO_TOMBSTONE_PURGING, 
Boolean.toString(dontPurgeTombstones));
         return options;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java 
b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
index 1e651a96d2..5c6550fac7 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -35,11 +35,13 @@ import org.apache.cassandra.utils.CassandraUInt;
 public class ValidationRequest extends RepairMessage
 {
     public final long nowInSec;
+    public final boolean dontPurgeTombstones;
 
-    public ValidationRequest(RepairJobDesc desc, long nowInSec)
+    public ValidationRequest(RepairJobDesc desc, long nowInSec, boolean 
dontPurgeTombstones)
     {
         super(desc);
         this.nowInSec = nowInSec;
+        this.dontPurgeTombstones = dontPurgeTombstones;
     }
 
     @Override
@@ -47,6 +49,7 @@ public class ValidationRequest extends RepairMessage
     {
         return "ValidationRequest{" +
                "nowInSec=" + nowInSec +
+               ", dontPurgeTombstones" + dontPurgeTombstones +
                "} " + super.toString();
     }
 
@@ -72,19 +75,23 @@ public class ValidationRequest extends RepairMessage
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
             out.writeInt(version >= MessagingService.VERSION_50 ? 
CassandraUInt.fromLong(message.nowInSec) : (int) message.nowInSec);
+            if (version >= MessagingService.VERSION_51)
+                out.writeBoolean(message.dontPurgeTombstones);
         }
 
         public ValidationRequest deserialize(DataInputPlus dis, int version) 
throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(dis, 
version);
             long nowInsec = version >= MessagingService.VERSION_50 ? 
CassandraUInt.toLong(dis.readInt()) : dis.readInt();
-            return new ValidationRequest(desc, nowInsec);
+            boolean dontPurgeTombstones = version >= 
MessagingService.VERSION_51 ? dis.readBoolean() : false;
+            return new ValidationRequest(desc, nowInsec, dontPurgeTombstones);
         }
 
         public long serializedSize(ValidationRequest message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, 
version);
             size += TypeSizes.INT_SIZE;
+            size += version >= MessagingService.VERSION_51 ? 
TypeSizes.sizeof(message.dontPurgeTombstones) : 0;
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 9682c8e959..b5fbc48d43 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -454,6 +454,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                              boolean optimiseStreams,
                                              boolean repairPaxos,
                                              boolean paxosOnly,
+                                             boolean dontPurgeTombstones,
                                              ExecutorPlus executor,
                                              Scheduler validationScheduler,
                                              String... cfnames)
@@ -469,7 +470,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
 
         final RepairSession session = new RepairSession(ctx, 
validationScheduler, parentRepairSession, range, keyspace,
                                                         parallelismDegree, 
isIncremental, pullRepair,
-                                                        previewKind, 
optimiseStreams, repairPaxos, paxosOnly, cfnames);
+                                                        previewKind, 
optimiseStreams, repairPaxos, paxosOnly, dontPurgeTombstones, cfnames);
         repairs.getIfPresent(parentRepairSession).register(session.state);
 
         sessions.put(session.getId(), session);
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java 
b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 3583240830..c66992acc9 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -108,6 +108,9 @@ public class Repair extends NodeToolCmd
     @Option(title = "ignore_unreplicated_keyspaces", name = 
{"-iuk","--ignore-unreplicated-keyspaces"}, description = "Use 
--ignore-unreplicated-keyspaces to ignore keyspaces which are not replicated, 
otherwise the repair will fail")
     private boolean ignoreUnreplicatedKeyspaces = false;
 
+    @Option(title = "no_purge", name = {"--include-gcgs-expired-tombstones"}, 
description = "Do not apply gc grace seconds to purge any tombstones.  Only 
useful in rare recovery scenarios, never regular operations.")
+    private boolean dontPurgeTombstones = false;
+
     private PreviewKind getPreviewKind()
     {
         if (validate)
@@ -186,6 +189,7 @@ public class Repair extends NodeToolCmd
         options.put(RepairOption.IGNORE_UNREPLICATED_KS, 
Boolean.toString(ignoreUnreplicatedKeyspaces));
         options.put(RepairOption.REPAIR_PAXOS_KEY, Boolean.toString(!skipPaxos 
&& getPreviewKind() == PreviewKind.NONE));
         options.put(RepairOption.PAXOS_ONLY_KEY, Boolean.toString(paxosOnly && 
getPreviewKind() == PreviewKind.NONE));
+        options.put(RepairOption.NO_TOMBSTONE_PURGING, 
Boolean.toString(dontPurgeTombstones));
 
         if (!startToken.isEmpty() || !endToken.isEmpty())
         {
diff --git a/test/data/serialization/5.1/service.ValidationRequest.bin 
b/test/data/serialization/5.1/service.ValidationRequest.bin
index 04c492a8a1..2a2aea7183 100644
Binary files a/test/data/serialization/5.1/service.ValidationRequest.bin and 
b/test/data/serialization/5.1/service.ValidationRequest.bin differ
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/repair/NoTombstonePurgingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/repair/NoTombstonePurgingTest.java
new file mode 100644
index 0000000000..adcb2b3e2c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/repair/NoTombstonePurgingTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.repair;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NoTombstonePurgingTest extends TestBaseImpl
+{
+    @Test
+    public void testNp() throws IOException
+    {
+        testHelper((cluster) -> {
+            // full repair, with -np, tombstone gets streamed
+            cluster.get(1).nodetoolResult("repair", 
"--include-gcgs-expired-tombstones", "--full", KEYSPACE, "tbl");
+        });
+    }
+
+    private void testHelper(Consumer<Cluster> repair) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> 
c.set("hinted_handoff_enabled", false)
+                                                             
.with(Feature.values()))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key) with gc_grace_seconds = 1 and 
compaction={'class':'SizeTieredCompactionStrategy', 'enabled':false}"));
+            cluster.get(1).executeInternal(withKeyspace("delete from %s.tbl 
where id = 5"));
+            cluster.get(1).flush(KEYSPACE);
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); //gcgs 
expiry
+
+            // incremental repair, the tombstone is purgeable, will not get 
included in MT calculation
+            cluster.get(1).nodetoolResult("repair", KEYSPACE, "tbl");
+            cluster.get(2).runOnInstance(() -> 
assertTrue(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty()));
+
+            // full repair, tombstone still gets purged
+            cluster.get(1).nodetoolResult("repair", "--full", KEYSPACE, "tbl");
+            cluster.get(2).runOnInstance(() -> 
assertTrue(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty()));
+
+            repair.accept(cluster);
+
+            cluster.get(2).runOnInstance(() -> 
assertFalse(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty()));
+        }
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
 
b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
index f24e639ac3..46edfb3926 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnInstanceRepair.java
@@ -97,7 +97,7 @@ class OnInstanceRepair extends ClusterAction
     {
         Collection<Range<Token>> ranges = rangesSupplier.call();
         // no need to wait for completion, as we track all task submissions 
and message exchanges, and ensure they finish before continuing to next action
-        StorageService.instance.repair(keyspaceName, new 
RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, 
ranges, false, false, force, PreviewKind.NONE, false, true, repairPaxos, 
repairOnlyPaxos), singletonList((tag, event) -> {
+        StorageService.instance.repair(keyspaceName, new 
RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, 
ranges, false, false, force, PreviewKind.NONE, false, true, repairPaxos, 
repairOnlyPaxos, false), singletonList((tag, event) -> {
             if (event.getType() == ProgressEventType.COMPLETE)
                 listener.run();
         }));
diff --git 
a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
 
b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
index 011db44d09..4c7932888b 100644
--- 
a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
+++ 
b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
@@ -143,7 +143,7 @@ public class CompactionManagerGetSSTablesForValidationTest
         modifySSTables();
 
         // get sstables for repair
-        Validator validator = new Validator(new 
ValidationState(Clock.Global.clock(), desc, coordinator), 
FBUtilities.nowInSeconds(), true, PreviewKind.NONE);
+        Validator validator = new Validator(new 
ValidationState(Clock.Global.clock(), desc, coordinator), 
FBUtilities.nowInSeconds(), true, PreviewKind.NONE, false);
         Set<SSTableReader> sstables = 
Sets.newHashSet(getSSTablesToValidate(cfs, SharedContext.Global.instance, 
validator.desc.ranges, validator.desc.parentSessionId, 
validator.isIncremental));
         Assert.assertNotNull(sstables);
         Assert.assertEquals(1, sstables.size());
@@ -158,7 +158,7 @@ public class CompactionManagerGetSSTablesForValidationTest
         modifySSTables();
 
         // get sstables for repair
-        Validator validator = new Validator(new 
ValidationState(Clock.Global.clock(), desc, coordinator), 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
+        Validator validator = new Validator(new 
ValidationState(Clock.Global.clock(), desc, coordinator), 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE, false);
         Set<SSTableReader> sstables = 
Sets.newHashSet(getSSTablesToValidate(cfs, SharedContext.Global.instance, 
validator.desc.ranges, validator.desc.parentSessionId, 
validator.isIncremental));
         Assert.assertNotNull(sstables);
         Assert.assertEquals(2, sstables.size());
@@ -174,7 +174,7 @@ public class CompactionManagerGetSSTablesForValidationTest
         modifySSTables();
 
         // get sstables for repair
-        Validator validator = new Validator(new 
ValidationState(Clock.Global.clock(), desc, coordinator), 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
+        Validator validator = new Validator(new 
ValidationState(Clock.Global.clock(), desc, coordinator), 
FBUtilities.nowInSeconds(), false, PreviewKind.NONE, false);
         Set<SSTableReader> sstables = 
Sets.newHashSet(getSSTablesToValidate(cfs, SharedContext.Global.instance, 
validator.desc.ranges, validator.desc.parentSessionId, 
validator.isIncremental));
         Assert.assertNotNull(sstables);
         Assert.assertEquals(3, sstables.size());
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java 
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 36c17855e3..ea32bd750b 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -126,11 +126,11 @@ public class RepairJobTest
         public MeasureableRepairSession(TimeUUID parentRepairSession, 
CommonRange commonRange, String keyspace,
                                         RepairParallelism parallelismDegree, 
boolean isIncremental, boolean pullRepair,
                                         PreviewKind previewKind, boolean 
optimiseStreams, boolean repairPaxos, boolean paxosOnly,
-                                        String... cfnames)
+                                        boolean dontPurgeTombstones, String... 
cfnames)
         {
             super(SharedContext.Global.instance, new Scheduler.NoopScheduler(),
                   parentRepairSession, commonRange, keyspace, 
parallelismDegree, isIncremental, pullRepair,
-                  previewKind, optimiseStreams, repairPaxos, paxosOnly, 
cfnames);
+                  previewKind, optimiseStreams, repairPaxos, paxosOnly, 
dontPurgeTombstones, cfnames);
         }
 
         @Override
@@ -196,7 +196,7 @@ public class RepairJobTest
         this.session = new MeasureableRepairSession(parentRepairSession,
                                                     new CommonRange(neighbors, 
emptySet(), FULL_RANGE),
                                                     KEYSPACE, SEQUENTIAL, 
false, false,
-                                                    NONE, false, true, false, 
CF);
+                                                    NONE, false, true, false, 
false, CF);
 
         this.job = new RepairJob(session, CF);
         this.sessionJobDesc = new 
RepairJobDesc(session.state.parentRepairSession, session.getId(),
diff --git 
a/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java
 
b/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java
index 6b74d0febd..4e7f9ba400 100644
--- 
a/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java
@@ -262,7 +262,7 @@ public class RepairMessageVerbHandlerOutOfRangeTest
                                                                  true,
                                                                  
PreviewKind.NONE);
         return new ValidationRequest(new RepairJobDesc(parentId, uuid(), 
KEYSPACE, TABLE, Collections.singleton(range)),
-                                     randomInt());
+                                     randomInt(), false);
     }
 
     public static TimeUUID uuid()
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java 
b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index f4d177870e..470a2efc53 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -67,7 +67,8 @@ public class RepairSessionTest
                                                   new CommonRange(endpoints, 
Collections.emptySet(), Arrays.asList(repairRange)),
                                                   "Keyspace1", 
RepairParallelism.SEQUENTIAL,
                                                   false, false,
-                                                  PreviewKind.NONE, false, 
false, false, "Standard1");
+                                                  PreviewKind.NONE, false, 
false, false, false,
+                                         "Standard1");
 
         // perform convict
         session.convict(remote, Double.MAX_VALUE);
diff --git a/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java 
b/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java
index 5639d4105c..e7f325de52 100644
--- a/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidationTaskTest.java
@@ -79,6 +79,6 @@ public class ValidationTaskTest
     private ValidationTask createTask() throws UnknownHostException {
         InetAddressAndPort addressAndPort = 
InetAddressAndPort.getByName("127.0.0.1");
         RepairJobDesc desc = new RepairJobDesc(nextTimeUUID(), nextTimeUUID(), 
UUID.randomUUID().toString(), UUID.randomUUID().toString(), null);
-        return new ValidationTask(SharedContext.Global.instance, desc, 
addressAndPort, 0, PreviewKind.NONE);
+        return new ValidationTask(SharedContext.Global.instance, desc, 
addressAndPort, 0, PreviewKind.NONE, false);
     }
 }
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java 
b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 7d0317f2c1..9ea956b85c 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -208,7 +208,7 @@ public class ValidatorTest
                                                                    false, 
PreviewKind.NONE);
 
         final CompletableFuture<Message> outgoingMessageSink = 
registerOutgoingMessageSink();
-        Validator validator = new Validator(SharedContext.Global.instance, new 
ValidationState(Clock.Global.clock(), desc, host), 0, true, false, 
PreviewKind.NONE);
+        Validator validator = new Validator(SharedContext.Global.instance, new 
ValidationState(Clock.Global.clock(), desc, host), 0, true, false, 
PreviewKind.NONE, false);
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
@@ -265,7 +265,7 @@ public class ValidatorTest
                                                                    false, 
PreviewKind.NONE);
 
         final CompletableFuture<Message> outgoingMessageSink = 
registerOutgoingMessageSink();
-        Validator validator = new Validator(SharedContext.Global.instance, new 
ValidationState(Clock.Global.clock(), desc, host), 0, true, false, 
PreviewKind.NONE);
+        Validator validator = new Validator(SharedContext.Global.instance, new 
ValidationState(Clock.Global.clock(), desc, host), 0, true, false, 
PreviewKind.NONE, false);
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
@@ -327,7 +327,7 @@ public class ValidatorTest
                                                                    false, 
PreviewKind.NONE);
 
         final CompletableFuture<Message> outgoingMessageSink = 
registerOutgoingMessageSink();
-        Validator validator = new Validator(SharedContext.Global.instance, new 
ValidationState(Clock.Global.clock(), desc, host), 0, true, false, 
PreviewKind.NONE);
+        Validator validator = new Validator(SharedContext.Global.instance, new 
ValidationState(Clock.Global.clock(), desc, host), 0, true, false, 
PreviewKind.NONE, false);
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
diff --git 
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index 9886076d70..1657ceff48 100644
--- 
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -87,9 +87,13 @@ public class RepairMessageSerializationsTest
     public void validationRequestMessage() throws IOException
     {
         RepairJobDesc jobDesc = buildRepairJobDesc();
-        ValidationRequest msg = new ValidationRequest(jobDesc, GC_BEFORE);
+        ValidationRequest msg = new ValidationRequest(jobDesc, GC_BEFORE, 
false);
         ValidationRequest deserialized = serializeRoundTrip(msg, 
ValidationRequest.serializer);
         Assert.assertEquals(jobDesc, deserialized.desc);
+
+        msg = new ValidationRequest(jobDesc, GC_BEFORE, true);
+        deserialized = serializeRoundTrip(msg, ValidationRequest.serializer);
+        Assert.assertEquals(jobDesc, deserialized.desc);
     }
 
     private RepairJobDesc buildRepairJobDesc()
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java 
b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index d10b65dd17..20431fc335 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -104,7 +104,7 @@ public class SerializationsTest extends 
AbstractSerializationsTester
 
     private void testValidationRequestWrite() throws IOException
     {
-        ValidationRequest message = new ValidationRequest(DESC, 1234);
+        ValidationRequest message = new ValidationRequest(DESC, 1234, true);
         testRepairMessageWrite("service.ValidationRequest.bin", 
ValidationRequest.serializer, message);
     }
 
@@ -119,6 +119,7 @@ public class SerializationsTest extends 
AbstractSerializationsTester
             ValidationRequest message = 
ValidationRequest.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
             assert message.nowInSec == 1234;
+            assert message.dontPurgeTombstones == true;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to