Repository: cassandra Updated Branches: refs/heads/trunk af3748909 -> 9fdec0a82
Use common nowInSec for validation compactions Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13671 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fdec0a8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fdec0a8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fdec0a8 Branch: refs/heads/trunk Commit: 9fdec0a82851f5c35cd21d02e8c4da8fc685edb2 Parents: af37489 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Wed Jul 5 11:18:21 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Thu Jul 6 10:41:31 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 18 ++---------------- .../org/apache/cassandra/repair/RepairJob.java | 16 ++++++++-------- .../repair/RepairMessageVerbHandler.java | 2 +- .../apache/cassandra/repair/ValidationTask.java | 8 ++++---- .../org/apache/cassandra/repair/Validator.java | 14 +++++++------- .../repair/messages/ValidationRequest.java | 18 +++++++++--------- .../cassandra/service/SerializationsTest.java | 2 +- 8 files changed, 33 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9584f63..4f2d2a1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Use common nowInSec for validation compactions (CASSANDRA-13671) * Improve handling of IR prepare failures (CASSANDRA-13672) * Send IR coordinator messages synchronously (CASSANDRA-13673) * Flush system.repair table before IR finalize promise (CASSANDRA-13660) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index d7e00da..0532515 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1312,9 +1312,6 @@ public class CompactionManager implements CompactionManagerMBean Refs<SSTableReader> sstables = null; try { - - int gcBefore; - int nowInSec = FBUtilities.nowInSeconds(); UUID parentRepairSessionId = validator.desc.parentSessionId; String snapshotName; boolean isGlobalSnapshotValidation = cfs.snapshotExists(parentRepairSessionId.toString()); @@ -1330,13 +1327,6 @@ public class CompactionManager implements CompactionManagerMBean // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we // are supposed to validate. sstables = cfs.getSnapshotSSTableReaders(snapshotName); - - - // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute - // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation - // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case - // 'as good as in the non-snapshot' case) - gcBefore = cfs.gcBefore((int)(cfs.getSnapshotCreationTime(snapshotName) / 1000)); } else { @@ -1348,10 +1338,6 @@ public class CompactionManager implements CompactionManagerMBean sstables = getSSTablesToValidate(cfs, validator); if (sstables == null) return; // this means the parent repair session was removed - the repair session failed on another node and we removed it - if (validator.gcBefore > 0) - gcBefore = validator.gcBefore; - else - gcBefore = getDefaultGcBefore(cfs, nowInSec); } // Create Merkle trees suitable to hold estimated partitions for the given ranges. @@ -1360,8 +1346,8 @@ public class CompactionManager implements CompactionManagerMBean long start = System.nanoTime(); long partitionCount = 0; try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); - ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore); - CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics)) + ValidationCompactionController controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, validator.nowInSec)); + CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, validator.nowInSec, metrics)) { // validate the CF as we iterate over it validator.prepare(cfs, tree); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 58a369e..0615681 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -196,11 +196,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); + int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); for (InetAddress endpoint : endpoints) { - ValidationTask task = new ValidationTask(desc, endpoint, gcBefore, previewKind); + ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, previewKind); tasks.add(task); session.waitForValidation(Pair.create(desc, endpoint), task); taskExecutor.execute(task); @@ -216,12 +216,12 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); + int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); Queue<InetAddress> requests = new LinkedList<>(endpoints); InetAddress address = requests.poll(); - ValidationTask firstTask = new ValidationTask(desc, address, gcBefore, previewKind); + ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind); logger.info("Validating {}", address); session.waitForValidation(Pair.create(desc, address), firstTask); tasks.add(firstTask); @@ -229,7 +229,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable while (requests.size() > 0) { final InetAddress nextAddress = requests.poll(); - final ValidationTask nextTask = new ValidationTask(desc, nextAddress, gcBefore, previewKind); + final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() { @@ -258,7 +258,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds()); + int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>(); @@ -278,7 +278,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { Queue<InetAddress> requests = entry.getValue(); InetAddress address = requests.poll(); - ValidationTask firstTask = new ValidationTask(desc, address, gcBefore, previewKind); + ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind); logger.info("Validating {}", address); session.waitForValidation(Pair.create(desc, address), firstTask); tasks.add(firstTask); @@ -286,7 +286,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable while (requests.size() > 0) { final InetAddress nextAddress = requests.poll(); - final ValidationTask nextTask = new ValidationTask(desc, nextAddress, gcBefore, previewKind); + final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index ed62229..c38d098 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -135,7 +135,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> } ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId); - Validator validator = new Validator(desc, message.from, validationRequest.gcBefore, + Validator validator = new Validator(desc, message.from, validationRequest.nowInSec, isConsistent(desc.parentSessionId), previewKind(desc.parentSessionId)); CompactionManager.instance.submitValidation(store, validator); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/ValidationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java index f68d3c5..175709f 100644 --- a/src/java/org/apache/cassandra/repair/ValidationTask.java +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@ -35,14 +35,14 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn { private final RepairJobDesc desc; private final InetAddress endpoint; - private final int gcBefore; + private final int nowInSec; private final PreviewKind previewKind; - public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int gcBefore, PreviewKind previewKind) + public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int nowInSec, PreviewKind previewKind) { this.desc = desc; this.endpoint = endpoint; - this.gcBefore = gcBefore; + this.nowInSec = nowInSec; this.previewKind = previewKind; } @@ -51,7 +51,7 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn */ public void run() { - ValidationRequest request = new ValidationRequest(desc, gcBefore); + ValidationRequest request = new ValidationRequest(desc, nowInSec); MessagingService.instance().sendOneWay(request.createMessage(), endpoint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index ba1fa9d..bdf8cca 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -58,7 +58,7 @@ public class Validator implements Runnable public final RepairJobDesc desc; public final InetAddress initiator; - public final int gcBefore; + public final int nowInSec; private final boolean evenTreeDistribution; public final boolean isConsistent; @@ -74,21 +74,21 @@ public class Validator implements Runnable private final PreviewKind previewKind; - public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, PreviewKind previewKind) { - this(desc, initiator, gcBefore, false, false, previewKind); + this(desc, initiator, nowInSec, false, false, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean isConsistent, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isConsistent, PreviewKind previewKind) { - this(desc, initiator, gcBefore, false, isConsistent, previewKind); + this(desc, initiator, nowInSec, false, isConsistent, previewKind); } - public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind) + public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind) { this.desc = desc; this.initiator = initiator; - this.gcBefore = gcBefore; + this.nowInSec = nowInSec; this.isConsistent = isConsistent; this.previewKind = previewKind; validated = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java index 0dfab6a..6466244 100644 --- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java @@ -33,20 +33,20 @@ public class ValidationRequest extends RepairMessage { public static MessageSerializer serializer = new ValidationRequestSerializer(); - public final int gcBefore; + public final int nowInSec; - public ValidationRequest(RepairJobDesc desc, int gcBefore) + public ValidationRequest(RepairJobDesc desc, int nowInSec) { super(Type.VALIDATION_REQUEST, desc); - this.gcBefore = gcBefore; + this.nowInSec = nowInSec; } @Override public String toString() { return "ValidationRequest{" + - "gcBefore=" + gcBefore + - "} " + super.toString(); + "nowInSec=" + nowInSec + + "} " + super.toString(); } @Override @@ -56,13 +56,13 @@ public class ValidationRequest extends RepairMessage if (o == null || getClass() != o.getClass()) return false; ValidationRequest that = (ValidationRequest) o; - return gcBefore == that.gcBefore; + return nowInSec == that.nowInSec; } @Override public int hashCode() { - return gcBefore; + return nowInSec; } public static class ValidationRequestSerializer implements MessageSerializer<ValidationRequest> @@ -70,7 +70,7 @@ public class ValidationRequest extends RepairMessage public void serialize(ValidationRequest message, DataOutputPlus out, int version) throws IOException { RepairJobDesc.serializer.serialize(message.desc, out, version); - out.writeInt(message.gcBefore); + out.writeInt(message.nowInSec); } public ValidationRequest deserialize(DataInputPlus dis, int version) throws IOException @@ -82,7 +82,7 @@ public class ValidationRequest extends RepairMessage public long serializedSize(ValidationRequest message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); - size += TypeSizes.sizeof(message.gcBefore); + size += TypeSizes.sizeof(message.nowInSec); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fdec0a8/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index a63dc69..d943bb7 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -109,7 +109,7 @@ public class SerializationsTest extends AbstractSerializationsTester RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion()); assert message.messageType == RepairMessage.Type.VALIDATION_REQUEST; assert DESC.equals(message.desc); - assert ((ValidationRequest) message).gcBefore == 1234; + assert ((ValidationRequest) message).nowInSec == 1234; assert MessageIn.read(in, getVersion(), -1) != null; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org