Merge branch 'cassandra-2.2' into cassandra-3.0
Conflicts:
src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
src/java/org/apache/cassandra/service/ActiveRepairService.java
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f490ccec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f490ccec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f490ccec
Branch: refs/heads/trunk
Commit: f490ccec62b2b86b9dbf0fff3888852aedbc7f65
Parents: 8afc76a 842f150
Author: Marcus Eriksson <[email protected]>
Authored: Tue Aug 25 19:05:23 2015 +0200
Committer: Marcus Eriksson <[email protected]>
Committed: Tue Aug 25 19:05:34 2015 +0200
----------------------------------------------------------------------
.../repair/RepairMessageVerbHandler.java | 5 +-
.../apache/cassandra/repair/RepairRunnable.java | 2 +-
.../repair/messages/PrepareMessage.java | 10 +++-
.../cassandra/repair/messages/RepairOption.java | 4 ++
.../cassandra/service/ActiveRepairService.java | 49 +++++++++++++-------
.../LeveledCompactionStrategyTest.java | 2 +-
.../cassandra/repair/LocalSyncTaskTest.java | 2 +-
7 files changed, 51 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 942d902,796f135..ffba9d6
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -73,7 -82,7 +73,8 @@@ public class RepairMessageVerbHandler i
columnFamilyStores,
prepareMessage.ranges,
prepareMessage.isIncremental,
- prepareMessage.timestamp);
- isGlobal);
++ prepareMessage.timestamp,
++ prepareMessage.isGlobal);
MessagingService.instance().sendReply(new
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 0cd73db,a57c27e..8909f1b
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@@ -40,16 -40,14 +40,18 @@@ public class PrepareMessage extends Rep
public final UUID parentRepairSession;
public final boolean isIncremental;
+ public final long timestamp;
++ public final boolean isGlobal;
- public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds,
Collection<Range<Token>> ranges, boolean isIncremental, long timestamp)
- public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds,
Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
++ public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds,
Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean
isGlobal)
{
- super(isGlobal ? Type.PREPARE_GLOBAL_MESSAGE : Type.PREPARE_MESSAGE,
null);
+ super(Type.PREPARE_MESSAGE, null);
this.parentRepairSession = parentRepairSession;
this.cfIds = cfIds;
this.ranges = ranges;
this.isIncremental = isIncremental;
+ this.timestamp = timestamp;
++ this.isGlobal = isGlobal;
}
public static class PrepareMessageSerializer implements
MessageSerializer<PrepareMessage>
@@@ -67,10 -65,9 +69,11 @@@
Range.tokenSerializer.serialize(r, out, version);
}
out.writeBoolean(message.isIncremental);
+ out.writeLong(message.timestamp);
++ out.writeBoolean(message.isGlobal);
}
- public PrepareMessage deserialize(DataInput in, int version) throws
IOException
+ public PrepareMessage deserialize(DataInputPlus in, int version)
throws IOException
{
int cfIdCount = in.readInt();
List<UUID> cfIds = new ArrayList<>(cfIdCount);
@@@ -82,8 -79,8 +85,9 @@@
for (int i = 0; i < rangeCount; i++)
ranges.add((Range<Token>)
Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(),
version));
boolean isIncremental = in.readBoolean();
-
- return new PrepareMessage(parentRepairSession, cfIds, ranges,
isIncremental, false);
+ long timestamp = in.readLong();
- return new PrepareMessage(parentRepairSession, cfIds, ranges,
isIncremental, timestamp);
++ boolean isGlobal = in.readBoolean();
++ return new PrepareMessage(parentRepairSession, cfIds, ranges,
isIncremental, timestamp, isGlobal);
}
public long serializedSize(PrepareMessage message, int version)
@@@ -93,11 -91,10 +97,12 @@@
for (UUID cfId : message.cfIds)
size += UUIDSerializer.serializer.serializedSize(cfId,
version);
size +=
UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
- size += sizes.sizeof(message.ranges.size());
+ size += TypeSizes.sizeof(message.ranges.size());
for (Range<Token> r : message.ranges)
size += Range.tokenSerializer.serializedSize(r, version);
- size += sizes.sizeof(message.isIncremental);
+ size += TypeSizes.sizeof(message.isIncremental);
+ size += TypeSizes.sizeof(message.timestamp);
++ size += TypeSizes.sizeof(message.isGlobal);
return size;
}
}
@@@ -110,7 -107,6 +115,8 @@@
", ranges=" + ranges +
", parentRepairSession=" + parentRepairSession +
", isIncremental="+isIncremental +
+ ", timestamp=" + timestamp +
++ ", isGlobal=" + isGlobal +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 0e09cf7,a6389ea..8079b3a
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -233,8 -237,7 +237,8 @@@ public class ActiveRepairServic
public synchronized UUID prepareForRepair(UUID parentRepairSession,
Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore>
columnFamilyStores)
{
- registerParentRepairSession(parentRepairSession, columnFamilyStores,
options.getRanges(), options.isIncremental(), options.isGlobal());
+ long timestamp = System.currentTimeMillis();
- registerParentRepairSession(parentRepairSession, columnFamilyStores,
options.getRanges(), options.isIncremental(), timestamp);
++ registerParentRepairSession(parentRepairSession, columnFamilyStores,
options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
final CountDownLatch prepareLatch = new
CountDownLatch(endpoints.size());
final AtomicBoolean status = new AtomicBoolean(true);
final Set<String> failedNodes = Collections.synchronizedSet(new
HashSet<String>());
@@@ -264,7 -267,10 +268,7 @@@
for (InetAddress neighbour : endpoints)
{
- PrepareMessage message = new PrepareMessage(parentRepairSession,
cfIds, options.getRanges(), options.isIncremental(), timestamp);
- CassandraVersion peerVersion =
SystemKeyspace.getReleaseVersion(neighbour);
- boolean isGlobal = options.isGlobal() && peerVersion != null &&
peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
- logger.debug("Sending prepare message: options.isGlobal = {},
peerVersion = {}", options.isGlobal(), peerVersion);
- PrepareMessage message = new PrepareMessage(parentRepairSession,
cfIds, options.getRanges(), options.isIncremental(), isGlobal);
++ PrepareMessage message = new PrepareMessage(parentRepairSession,
cfIds, options.getRanges(), options.isIncremental(), timestamp,
options.isGlobal());
MessageOut<RepairMessage> msg = message.createMessage();
MessagingService.instance().sendRR(msg, neighbour, callback,
TimeUnit.HOURS.toMillis(1), true);
}
@@@ -287,9 -293,9 +291,9 @@@
return parentRepairSession;
}
- public void registerParentRepairSession(UUID parentRepairSession,
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges,
boolean isIncremental, long timestamp)
- public void registerParentRepairSession(UUID parentRepairSession,
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges,
boolean isIncremental, boolean isGlobal)
++ public void registerParentRepairSession(UUID parentRepairSession,
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges,
boolean isIncremental, long timestamp, boolean isGlobal)
{
- parentRepairSessions.put(parentRepairSession, new
ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp));
- parentRepairSessions.put(parentRepairSession, new
ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal,
System.currentTimeMillis()));
++ parentRepairSessions.put(parentRepairSession, new
ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp,
isGlobal));
}
public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID
parentRepairSession)
@@@ -401,16 -413,18 +411,18 @@@
private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new
HashMap<>();
private final Collection<Range<Token>> ranges;
private final Map<UUID, Set<SSTableReader>> sstableMap = new
HashMap<>();
- public final long repairedAt;
+ private final long repairedAt;
public final boolean isIncremental;
+ private final boolean isGlobal;
- public ParentRepairSession(List<ColumnFamilyStore>
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental,
long repairedAt)
- public ParentRepairSession(List<ColumnFamilyStore>
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental,
boolean isGlobal, long repairedAt)
++ public ParentRepairSession(List<ColumnFamilyStore>
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental,
long repairedAt, boolean isGlobal)
{
for (ColumnFamilyStore cfs : columnFamilyStores)
this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
this.ranges = ranges;
this.repairedAt = repairedAt;
- this.isGlobal = isGlobal;
this.isIncremental = isIncremental;
++ this.isGlobal = isGlobal;
}
public void addSSTables(UUID cfId, Set<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc
test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 456dcd1,63fd0e7..8050b6c
---
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -183,10 -195,10 +183,10 @@@ public class LeveledCompactionStrategyT
assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
Range<Token> range = new Range<>(Util.token(""), Util.token(""));
- int gcBefore =
keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis());
+ int gcBefore =
keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
UUID parentRepSession = UUID.randomUUID();
-
ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis());
-
ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
Arrays.asList(cfs), Arrays.asList(range), false, true);
- RepairJobDesc desc = new RepairJobDesc(parentRepSession,
UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
++
ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(),
true);
+ RepairJobDesc desc = new RepairJobDesc(parentRepSession,
UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
Validator validator = new Validator(desc,
FBUtilities.getBroadcastAddress(), gcBefore);
CompactionManager.instance.submitValidation(cfs, validator).get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f490ccec/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index ff5b99e,e5c03b9..eec29bc
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@@ -89,13 -89,12 +89,13 @@@ public class LocalSyncTaskTest extends
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession,
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis());
-
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession,
Arrays.asList(cfs), Arrays.asList(range), false, false);
++
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession,
Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(),
false);
+
+ RepairJobDesc desc = new RepairJobDesc(parentRepairSession,
UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
- RepairJobDesc desc = new RepairJobDesc(parentRepairSession,
UUID.randomUUID(), KEYSPACE1, "Standard1", range);
+ MerkleTrees tree1 = createInitialTree(desc);
- MerkleTree tree1 = createInitialTree(desc);
- MerkleTree tree2 = createInitialTree(desc);
+ MerkleTrees tree2 = createInitialTree(desc);
// change a range in one of the trees
Token token = partirioner.midpoint(range.left, range.right);