Repository: cassandra
Updated Branches:
  refs/heads/trunk af3748909 -> 9fdec0a82


Use common nowInSec for validation compactions

Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13671


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fdec0a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fdec0a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fdec0a8

Branch: refs/heads/trunk
Commit: 9fdec0a82851f5c35cd21d02e8c4da8fc685edb2
Parents: af37489
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Wed Jul 5 11:18:21 2017 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Thu Jul 6 10:41:31 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 .../db/compaction/CompactionManager.java          | 18 ++----------------
 .../org/apache/cassandra/repair/RepairJob.java    | 16 ++++++++--------
 .../repair/RepairMessageVerbHandler.java          |  2 +-
 .../apache/cassandra/repair/ValidationTask.java   |  8 ++++----
 .../org/apache/cassandra/repair/Validator.java    | 14 +++++++-------
 .../repair/messages/ValidationRequest.java        | 18 +++++++++---------
 .../cassandra/service/SerializationsTest.java     |  2 +-
 8 files changed, 33 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9584f63..4f2d2a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Use common nowInSec for validation compactions (CASSANDRA-13671)
  * Improve handling of IR prepare failures (CASSANDRA-13672)
  * Send IR coordinator messages synchronously (CASSANDRA-13673)
  * Flush system.repair table before IR finalize promise (CASSANDRA-13660)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index d7e00da..0532515 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1312,9 +1312,6 @@ public class CompactionManager implements 
CompactionManagerMBean
         Refs<SSTableReader> sstables = null;
         try
         {
-
-            int gcBefore;
-            int nowInSec = FBUtilities.nowInSeconds();
             UUID parentRepairSessionId = validator.desc.parentSessionId;
             String snapshotName;
             boolean isGlobalSnapshotValidation = 
cfs.snapshotExists(parentRepairSessionId.toString());
@@ -1330,13 +1327,6 @@ public class CompactionManager implements 
CompactionManagerMBean
                 // note that we populate the parent repair session when 
creating the snapshot, meaning the sstables in the snapshot are the ones we
                 // are supposed to validate.
                 sstables = cfs.getSnapshotSSTableReaders(snapshotName);
-
-
-                // Computing gcbefore based on the current time wouldn't be 
very good because we know each replica will execute
-                // this at a different time (that's the whole purpose of 
repair with snaphsot). So instead we take the creation
-                // time of the snapshot, which should give us roughtly the 
same time on each replica (roughtly being in that case
-                // 'as good as in the non-snapshot' case)
-                gcBefore = 
cfs.gcBefore((int)(cfs.getSnapshotCreationTime(snapshotName) / 1000));
             }
             else
             {
@@ -1348,10 +1338,6 @@ public class CompactionManager implements 
CompactionManagerMBean
                 sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
                     return; // this means the parent repair session was 
removed - the repair session failed on another node and we removed it
-                if (validator.gcBefore > 0)
-                    gcBefore = validator.gcBefore;
-                else
-                    gcBefore = getDefaultGcBefore(cfs, nowInSec);
             }
 
             // Create Merkle trees suitable to hold estimated partitions for 
the given ranges.
@@ -1360,8 +1346,8 @@ public class CompactionManager implements 
CompactionManagerMBean
             long start = System.nanoTime();
             long partitionCount = 0;
             try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
-                 ValidationCompactionController controller = new 
ValidationCompactionController(cfs, gcBefore);
-                 CompactionIterator ci = new 
ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
+                 ValidationCompactionController controller = new 
ValidationCompactionController(cfs, getDefaultGcBefore(cfs, 
validator.nowInSec));
+                 CompactionIterator ci = new 
ValidationCompactionIterator(scanners.scanners, controller, validator.nowInSec, 
metrics))
             {
                 // validate the CF as we iterate over it
                 validator.prepare(cfs, tree);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 58a369e..0615681 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -196,11 +196,11 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
-        int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
+        int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
         for (InetAddress endpoint : endpoints)
         {
-            ValidationTask task = new ValidationTask(desc, endpoint, gcBefore, 
previewKind);
+            ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, 
previewKind);
             tasks.add(task);
             session.waitForValidation(Pair.create(desc, endpoint), task);
             taskExecutor.execute(task);
@@ -216,12 +216,12 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
-        int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
+        int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
 
         Queue<InetAddress> requests = new LinkedList<>(endpoints);
         InetAddress address = requests.poll();
-        ValidationTask firstTask = new ValidationTask(desc, address, gcBefore, 
previewKind);
+        ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, 
previewKind);
         logger.info("Validating {}", address);
         session.waitForValidation(Pair.create(desc, address), firstTask);
         tasks.add(firstTask);
@@ -229,7 +229,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         while (requests.size() > 0)
         {
             final InetAddress nextAddress = requests.poll();
-            final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, gcBefore, previewKind);
+            final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, nowInSec, previewKind);
             tasks.add(nextTask);
             Futures.addCallback(currentTask, new FutureCallback<TreeResponse>()
             {
@@ -258,7 +258,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
-        int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
+        int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
 
         Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>();
@@ -278,7 +278,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         {
             Queue<InetAddress> requests = entry.getValue();
             InetAddress address = requests.poll();
-            ValidationTask firstTask = new ValidationTask(desc, address, 
gcBefore, previewKind);
+            ValidationTask firstTask = new ValidationTask(desc, address, 
nowInSec, previewKind);
             logger.info("Validating {}", address);
             session.waitForValidation(Pair.create(desc, address), firstTask);
             tasks.add(firstTask);
@@ -286,7 +286,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
             while (requests.size() > 0)
             {
                 final InetAddress nextAddress = requests.poll();
-                final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, gcBefore, previewKind);
+                final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, nowInSec, previewKind);
                 tasks.add(nextTask);
                 Futures.addCallback(currentTask, new 
FutureCallback<TreeResponse>()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index ed62229..c38d098 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -135,7 +135,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     }
 
                     
ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
-                    Validator validator = new Validator(desc, message.from, 
validationRequest.gcBefore,
+                    Validator validator = new Validator(desc, message.from, 
validationRequest.nowInSec,
                                                         
isConsistent(desc.parentSessionId), previewKind(desc.parentSessionId));
                     CompactionManager.instance.submitValidation(store, 
validator);
                     break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/ValidationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java 
b/src/java/org/apache/cassandra/repair/ValidationTask.java
index f68d3c5..175709f 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -35,14 +35,14 @@ public class ValidationTask extends 
AbstractFuture<TreeResponse> implements Runn
 {
     private final RepairJobDesc desc;
     private final InetAddress endpoint;
-    private final int gcBefore;
+    private final int nowInSec;
     private final PreviewKind previewKind;
 
-    public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int 
gcBefore, PreviewKind previewKind)
+    public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int 
nowInSec, PreviewKind previewKind)
     {
         this.desc = desc;
         this.endpoint = endpoint;
-        this.gcBefore = gcBefore;
+        this.nowInSec = nowInSec;
         this.previewKind = previewKind;
     }
 
@@ -51,7 +51,7 @@ public class ValidationTask extends 
AbstractFuture<TreeResponse> implements Runn
      */
     public void run()
     {
-        ValidationRequest request = new ValidationRequest(desc, gcBefore);
+        ValidationRequest request = new ValidationRequest(desc, nowInSec);
         MessagingService.instance().sendOneWay(request.createMessage(), 
endpoint);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java 
b/src/java/org/apache/cassandra/repair/Validator.java
index ba1fa9d..bdf8cca 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -58,7 +58,7 @@ public class Validator implements Runnable
 
     public final RepairJobDesc desc;
     public final InetAddress initiator;
-    public final int gcBefore;
+    public final int nowInSec;
     private final boolean evenTreeDistribution;
     public final boolean isConsistent;
 
@@ -74,21 +74,21 @@ public class Validator implements Runnable
 
     private final PreviewKind previewKind;
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, 
PreviewKind previewKind)
     {
-        this(desc, initiator, gcBefore, false, false, previewKind);
+        this(desc, initiator, nowInSec, false, false, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean isConsistent, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, 
boolean isConsistent, PreviewKind previewKind)
     {
-        this(desc, initiator, gcBefore, false, isConsistent, previewKind);
+        this(desc, initiator, nowInSec, false, isConsistent, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, 
boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, 
boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind)
     {
         this.desc = desc;
         this.initiator = initiator;
-        this.gcBefore = gcBefore;
+        this.nowInSec = nowInSec;
         this.isConsistent = isConsistent;
         this.previewKind = previewKind;
         validated = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java 
b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
index 0dfab6a..6466244 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -33,20 +33,20 @@ public class ValidationRequest extends RepairMessage
 {
     public static MessageSerializer serializer = new 
ValidationRequestSerializer();
 
-    public final int gcBefore;
+    public final int nowInSec;
 
-    public ValidationRequest(RepairJobDesc desc, int gcBefore)
+    public ValidationRequest(RepairJobDesc desc, int nowInSec)
     {
         super(Type.VALIDATION_REQUEST, desc);
-        this.gcBefore = gcBefore;
+        this.nowInSec = nowInSec;
     }
 
     @Override
     public String toString()
     {
         return "ValidationRequest{" +
-                "gcBefore=" + gcBefore +
-                "} " + super.toString();
+               "nowInSec=" + nowInSec +
+               "} " + super.toString();
     }
 
     @Override
@@ -56,13 +56,13 @@ public class ValidationRequest extends RepairMessage
         if (o == null || getClass() != o.getClass()) return false;
 
         ValidationRequest that = (ValidationRequest) o;
-        return gcBefore == that.gcBefore;
+        return nowInSec == that.nowInSec;
     }
 
     @Override
     public int hashCode()
     {
-        return gcBefore;
+        return nowInSec;
     }
 
     public static class ValidationRequestSerializer implements 
MessageSerializer<ValidationRequest>
@@ -70,7 +70,7 @@ public class ValidationRequest extends RepairMessage
         public void serialize(ValidationRequest message, DataOutputPlus out, 
int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
-            out.writeInt(message.gcBefore);
+            out.writeInt(message.nowInSec);
         }
 
         public ValidationRequest deserialize(DataInputPlus dis, int version) 
throws IOException
@@ -82,7 +82,7 @@ public class ValidationRequest extends RepairMessage
         public long serializedSize(ValidationRequest message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, 
version);
-            size += TypeSizes.sizeof(message.gcBefore);
+            size += TypeSizes.sizeof(message.nowInSec);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java 
b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index a63dc69..d943bb7 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -109,7 +109,7 @@ public class SerializationsTest extends 
AbstractSerializationsTester
             RepairMessage message = RepairMessage.serializer.deserialize(in, 
getVersion());
             assert message.messageType == 
RepairMessage.Type.VALIDATION_REQUEST;
             assert DESC.equals(message.desc);
-            assert ((ValidationRequest) message).gcBefore == 1234;
+            assert ((ValidationRequest) message).nowInSec == 1234;
 
             assert MessageIn.read(in, getVersion(), -1) != null;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to