Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 4cc2b67df -> 13172bd99 refs/heads/trunk 6cad04b22 -> 110e803ed
Use the same repairedAt timestamp within incremental repair session patch by prmg; reviewed by yukim for CASSANDRA-9111 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13172bd9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13172bd9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13172bd9 Branch: refs/heads/cassandra-3.0 Commit: 13172bd993f86d44245e7140898c03db1a47073a Parents: 4cc2b67 Author: prmg <[email protected]> Authored: Wed Aug 19 18:12:36 2015 -0500 Committer: Yuki Morishita <[email protected]> Committed: Wed Aug 19 18:12:36 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/RepairMessageVerbHandler.java | 3 ++- .../apache/cassandra/repair/messages/PrepareMessage.java | 10 ++++++++-- .../org/apache/cassandra/service/ActiveRepairService.java | 9 +++++---- .../db/compaction/LeveledCompactionStrategyTest.java | 2 +- .../org/apache/cassandra/repair/LocalSyncTaskTest.java | 2 +- 6 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0d17235..cea8c73 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * Replace usage of Adler32 with CRC32 (CASSANDRA-8684) * Fix migration to new format from 2.1 SSTable (CASSANDRA-10006) * SequentialWriter should extend BufferedDataOutputStreamPlus (CASSANDRA-9500) + * Use the same repairedAt timestamp within incremental repair session (CASSANDRA-9111) Merged from 2.2: * Fix histogram overflow exception (CASSANDRA-9973) * Route gossip messages over dedicated socket (CASSANDRA-9237) http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 28a3bf5..942d902 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -72,7 +72,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, columnFamilyStores, prepareMessage.ranges, - prepareMessage.isIncremental); + prepareMessage.isIncremental, + prepareMessage.timestamp); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index cd1b99d..0cd73db 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -40,14 +40,16 @@ public class PrepareMessage extends RepairMessage public final UUID parentRepairSession; public final boolean isIncremental; + public final long timestamp; - public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental) + public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp) { super(Type.PREPARE_MESSAGE, null); this.parentRepairSession = parentRepairSession; this.cfIds = cfIds; this.ranges = ranges; this.isIncremental = isIncremental; + this.timestamp = timestamp; } public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage> @@ -65,6 +67,7 @@ public class PrepareMessage extends RepairMessage Range.tokenSerializer.serialize(r, out, version); } out.writeBoolean(message.isIncremental); + out.writeLong(message.timestamp); } public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException @@ -79,7 +82,8 @@ public class PrepareMessage extends RepairMessage for (int i = 0; i < rangeCount; i++) ranges.add((Range<Token>) Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version)); boolean isIncremental = in.readBoolean(); - return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental); + long timestamp = in.readLong(); + return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, timestamp); } public long serializedSize(PrepareMessage message, int version) @@ -93,6 +97,7 @@ public class PrepareMessage extends RepairMessage for (Range<Token> r : message.ranges) size += Range.tokenSerializer.serializedSize(r, version); size += TypeSizes.sizeof(message.isIncremental); + size += TypeSizes.sizeof(message.timestamp); return size; } } @@ -105,6 +110,7 @@ public class PrepareMessage extends RepairMessage ", ranges=" + ranges + ", parentRepairSession=" + parentRepairSession + ", isIncremental="+isIncremental + + ", timestamp=" + timestamp + '}'; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index e75d13e..0e09cf7 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -233,7 +233,8 @@ public class ActiveRepairService public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental()); + long timestamp = System.currentTimeMillis(); + registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); @@ -263,7 +264,7 @@ public class ActiveRepairService for (InetAddress neighbour : endpoints) { - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental()); + PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), timestamp); MessageOut<RepairMessage> msg = message.createMessage(); MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); } @@ -286,9 +287,9 @@ public class ActiveRepairService return parentRepairSession; } - public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental) + public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp) { - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis())); + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp)); } public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession) http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index bb15e88..9d5e5fc 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -185,7 +185,7 @@ public class LeveledCompactionStrategyTest Range<Token> range = new Range<>(Util.token(""), Util.token("")); int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false); + ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis()); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/13172bd9/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 734e91b..ff5b99e 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -89,7 +89,7 @@ public class LocalSyncTaskTest extends SchemaLoader Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false); + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis()); RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
