Repair common subranges of a set of nodes in one session reviewed by Stefania Alborghetti for CASSANDRA-5220
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0dd50a6c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0dd50a6c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0dd50a6c Branch: refs/heads/trunk Commit: 0dd50a6cdc81ec9ff1367238876d476affcf60e2 Parents: bf47408 Author: Marcus Olsson <[email protected]> Authored: Thu Aug 6 08:23:10 2015 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Aug 6 08:23:20 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 19 +- .../compaction/CompactionStrategyManager.java | 24 +- .../apache/cassandra/net/MessagingService.java | 6 + .../org/apache/cassandra/repair/RepairJob.java | 17 +- .../apache/cassandra/repair/RepairJobDesc.java | 41 +- .../repair/RepairMessageVerbHandler.java | 4 +- .../apache/cassandra/repair/RepairRunnable.java | 45 +- .../apache/cassandra/repair/RepairSession.java | 48 +- .../cassandra/repair/RepairSessionResult.java | 6 +- .../org/apache/cassandra/repair/SyncTask.java | 6 +- .../repair/SystemDistributedKeyspace.java | 27 +- .../apache/cassandra/repair/TreeResponse.java | 8 +- .../apache/cassandra/repair/ValidationTask.java | 14 +- .../org/apache/cassandra/repair/Validator.java | 83 +-- .../repair/messages/ValidationComplete.java | 47 +- .../cassandra/service/ActiveRepairService.java | 4 +- .../org/apache/cassandra/utils/MerkleTrees.java | 434 +++++++++++++++ .../serialization/3.0/gms.EndpointState.bin | Bin 0 -> 73 bytes test/data/serialization/3.0/gms.Gossip.bin | Bin 0 -> 158 bytes .../serialization/3.0/service.SyncComplete.bin | Bin 0 -> 362 bytes .../serialization/3.0/service.SyncRequest.bin | Bin 0 -> 219 bytes .../3.0/service.ValidationComplete.bin | Bin 0 -> 1251 bytes .../3.0/service.ValidationRequest.bin | Bin 0 -> 167 bytes .../cassandra/AbstractSerializationsTester.java | 3 +- .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/repair/LocalSyncTaskTest.java | 23 +- .../cassandra/repair/RepairSessionTest.java | 3 +- .../apache/cassandra/repair/ValidatorTest.java | 19 +- .../cassandra/service/SerializationsTest.java | 28 +- .../apache/cassandra/utils/MerkleTreeTest.java | 1 - .../apache/cassandra/utils/MerkleTreesTest.java | 538 +++++++++++++++++++ 32 files changed, 1256 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1d1ad0f..80e0e50 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-beta1 + * Repair improvements when using vnodes (CASSANDRA-5220) * Disable scripted UDFs by default (CASSANDRA-9889) * Add transparent data encryption core classes (CASSANDRA-9945) * Bytecode inspection for Java-UDFs (CASSANDRA-9890) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 92cc249..8aa16d5 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -69,6 +69,8 @@ import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; @@ -76,7 +78,7 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.UUIDGen; @@ -1045,7 +1047,7 @@ public class CompactionManager implements CompactionManagerMBean for (SSTableReader sstable : sstableCandidates.sstables) { - if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range))) + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges)) { sstablesToValidate.add(sstable); } @@ -1074,19 +1076,20 @@ public class CompactionManager implements CompactionManagerMBean gcBefore = getDefaultGcBefore(cfs, nowInSec); } - // Create Merkle tree suitable to hold estimated partitions for given range. - // We blindly assume that partition is evenly distributed on all sstables for now. + // Create Merkle trees suitable to hold estimated partitions for the given ranges. + // We blindly assume that a partition is evenly distributed on all sstables for now. long numPartitions = 0; for (SSTableReader sstable : sstables) { - numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range)); + numPartitions += sstable.estimatedKeysForRanges(validator.desc.ranges); } // determine tree depth from number of partitions, but cap at 20 to prevent large tree. int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; - MerkleTree tree = new MerkleTree(cfs.getPartitioner(), validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + MerkleTrees tree = new MerkleTrees(cfs.getPartitioner()); + tree.addMerkleTrees((int) Math.pow(2, depth), validator.desc.ranges); long start = System.nanoTime(); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore); CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics)) { @@ -1119,7 +1122,7 @@ public class CompactionManager implements CompactionManagerMBean duration, depth, numPartitions, - MerkleTree.serializer.serializedSize(tree, 0), + MerkleTrees.serializer.serializedSize(tree, 0), validator.desc); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index e5aff5d..4f6dfa2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db.compaction; import java.util.*; import java.util.concurrent.Callable; +import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -340,7 +341,7 @@ public class CompactionStrategyManager implements INotificationConsumer * @return */ @SuppressWarnings("resource") - public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { List<SSTableReader> repairedSSTables = new ArrayList<>(); List<SSTableReader> unrepairedSSTables = new ArrayList<>(); @@ -352,19 +353,26 @@ public class CompactionStrategyManager implements INotificationConsumer unrepairedSSTables.add(sstable); } + Set<ISSTableScanner> scanners = new HashSet<>(sstables.size()); - AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); - AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); + for (Range<Token> range : ranges) + { + AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); + AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); + + for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners)) + { + if (!scanners.add(scanner)) + scanner.close(); + } + } - List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size()); - scanners.addAll(repairedScanners.scanners); - scanners.addAll(unrepairedScanners.scanners); - return new AbstractCompactionStrategy.ScannerList(scanners); + return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners)); } public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) { - return getScanners(sstables, null); + return getScanners(sstables, Collections.singleton(null)); } public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 422fdb3..e10b4cb 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1111,6 +1111,12 @@ public final class MessagingService implements MessagingServiceMBean return StorageService.instance.getTokenMetadata().partitioner; } + public static void validatePartitioner(Collection<? extends AbstractBounds<?>> allBounds) + { + for (AbstractBounds<?> bounds : allBounds) + validatePartitioner(bounds); + } + public static void validatePartitioner(AbstractBounds<?> bounds) { if (globalPartitioner() != bounds.left.getPartitioner()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index ac20428..1e54f88 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -48,21 +48,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable * * @param session RepairSession that this RepairJob belongs * @param columnFamily name of the ColumnFamily to repair - * @param parallelismDegree how to run repair job in parallel - * @param repairedAt when the repair occurred (millis) - * @param taskExecutor Executor to run various repair tasks */ - public RepairJob(RepairSession session, - String columnFamily, - RepairParallelism parallelismDegree, - long repairedAt, - ListeningExecutorService taskExecutor) + public RepairJob(RepairSession session, String columnFamily) { this.session = session; - this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRange()); - this.repairedAt = repairedAt; - this.taskExecutor = taskExecutor; - this.parallelismDegree = parallelismDegree; + this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges()); + this.repairedAt = session.repairedAt; + this.taskExecutor = session.taskExecutor; + this.parallelismDegree = session.parallelismDegree; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairJobDesc.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index 1dd67c7..05adbf9 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -18,6 +18,8 @@ package org.apache.cassandra.repair; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.UUID; import com.google.common.base.Objects; @@ -47,21 +49,21 @@ public class RepairJobDesc public final String keyspace; public final String columnFamily; /** repairing range */ - public final Range<Token> range; + public final Collection<Range<Token>> ranges; - public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range) + public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Collection<Range<Token>> ranges) { this.parentSessionId = parentSessionId; this.sessionId = sessionId; this.keyspace = keyspace; this.columnFamily = columnFamily; - this.range = range; + this.ranges = ranges; } @Override public String toString() { - return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + range + "]"; + return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]"; } @Override @@ -74,7 +76,7 @@ public class RepairJobDesc if (!columnFamily.equals(that.columnFamily)) return false; if (!keyspace.equals(that.keyspace)) return false; - if (range != null ? !range.equals(that.range) : that.range != null) return false; + if (ranges != null ? that.ranges == null || (ranges.size() != that.ranges.size()) || (ranges.size() == that.ranges.size() && !ranges.containsAll(that.ranges)) : that.ranges != null) return false; if (!sessionId.equals(that.sessionId)) return false; if (parentSessionId != null ? !parentSessionId.equals(that.parentSessionId) : that.parentSessionId != null) return false; @@ -84,7 +86,7 @@ public class RepairJobDesc @Override public int hashCode() { - return Objects.hashCode(sessionId, keyspace, columnFamily, range); + return Objects.hashCode(sessionId, keyspace, columnFamily, ranges); } private static class RepairJobDescSerializer implements IVersionedSerializer<RepairJobDesc> @@ -100,8 +102,10 @@ public class RepairJobDesc UUIDSerializer.serializer.serialize(desc.sessionId, out, version); out.writeUTF(desc.keyspace); out.writeUTF(desc.columnFamily); - MessagingService.validatePartitioner(desc.range); - AbstractBounds.tokenSerializer.serialize(desc.range, out, version); + MessagingService.validatePartitioner(desc.ranges); + out.writeInt(desc.ranges.size()); + for (Range<Token> rt : desc.ranges) + AbstractBounds.tokenSerializer.serialize(rt, out, version); } public RepairJobDesc deserialize(DataInputPlus in, int version) throws IOException @@ -115,8 +119,19 @@ public class RepairJobDesc UUID sessionId = UUIDSerializer.serializer.deserialize(in, version); String keyspace = in.readUTF(); String columnFamily = in.readUTF(); - Range<Token> range = (Range<Token>)AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version); - return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range); + + int nRanges = in.readInt(); + Collection<Range<Token>> ranges = new ArrayList<>(); + Range<Token> range; + + for (int i = 0; i < nRanges; i++) + { + range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, + MessagingService.globalPartitioner(), version); + ranges.add(range); + } + + return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, ranges); } public long serializedSize(RepairJobDesc desc, int version) @@ -131,7 +146,11 @@ public class RepairJobDesc size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version); size += TypeSizes.sizeof(desc.keyspace); size += TypeSizes.sizeof(desc.columnFamily); - size += AbstractBounds.tokenSerializer.serializedSize(desc.range, version); + size += TypeSizes.sizeof(desc.ranges.size()); + for (Range<Token> rt : desc.ranges) + { + size += AbstractBounds.tokenSerializer.serializedSize(rt, version); + } return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index d765ae6..28a3bf5 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -79,14 +79,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> case SNAPSHOT: logger.debug("Snapshotting {}", desc); ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); - final Range<Token> repairingRange = desc.range; + final Collection<Range<Token>> repairingRange = desc.ranges; Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() { public boolean apply(SSTableReader sstable) { return sstable != null && !sstable.metadata.isIndex() && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); + new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(repairingRange); } }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 28511db..9401c03 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -53,6 +53,7 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.progress.ProgressEvent; @@ -146,17 +147,19 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } final Set<InetAddress> allNeighbors = new HashSet<>(); - Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); + List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>(); try { for (Range<Token> range : options.getRanges()) { - Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, - options.getDataCenters(), - options.getHosts()); - rangeToNeighbors.put(range, neighbors); - allNeighbors.addAll(neighbors); + Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, + options.getDataCenters(), + options.getHosts()); + + addRangeToNeighbors(commonRanges, range, neighbors); + allNeighbors.addAll(neighbors); } + progress.incrementAndGet(); } catch (IllegalArgumentException e) @@ -210,13 +213,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti "internal")); List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); - for (Range<Token> range : options.getRanges()) + for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges) { final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, - range, + p.right, keyspace, options.getParallelism(), - rangeToNeighbors.get(range), + p.left, repairedAt, executor, cfnames); @@ -228,7 +231,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti public void onSuccess(RepairSessionResult result) { String message = String.format("Repair session %s for range %s finished", session.getId(), - session.getRange().toString()); + session.getRanges().toString()); logger.info(message); fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, progress.incrementAndGet(), @@ -239,7 +242,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti public void onFailure(Throwable t) { String message = String.format("Repair session %s for range %s failed with error %s", - session.getId(), session.getRange().toString(), t.getMessage()); + session.getId(), session.getRanges().toString(), t.getMessage()); logger.error(message, t); fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, progress.incrementAndGet(), @@ -265,7 +268,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti { if (sessionResult != null) { - successfulRanges.add(sessionResult.range); + successfulRanges.addAll(sessionResult.ranges); } else { @@ -325,6 +328,24 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti }); } + private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors) + { + for (int i = 0; i < neighborRangeList.size(); i++) + { + Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i); + + if (p.left.containsAll(neighbors)) + { + p.right.add(range); + return; + } + } + + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(range); + neighborRangeList.add(Pair.create(neighbors, ranges)); + } + private Thread createQueryThread(final int cmd, final UUID sessionId) { return new Thread(new WrappedRunnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index a2dcdd1..a52b352 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -37,13 +37,13 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.Pair; /** - * Coordinates the (active) repair of a token range. + * Coordinates the (active) repair of a list of non overlapping token ranges. * - * A given RepairSession repairs a set of replicas for a given range on a list + * A given RepairSession repairs a set of replicas for a given set of ranges on a list * of column families. For each of the column family to repair, RepairSession * creates a {@link RepairJob} that handles the repair of that CF. * @@ -64,11 +64,11 @@ import org.apache.cassandra.utils.Pair; * A given session will execute the first phase (validation phase) of each of it's job * sequentially. In other words, it will start the first job and only start the next one * once that first job validation phase is complete. This is done so that the replica only - * create one merkle tree at a time, which is our way to ensure that such creation starts + * create one merkle tree per range at a time, which is our way to ensure that such creation starts * roughly at the same time on every node (see CASSANDRA-2816). However the synchronization * phases are allowed to run concurrently (with each other and with validation phases). * - * A given RepairJob has 2 modes: either sequential or not (isSequential flag). If sequential, + * A given RepairJob has 2 modes: either sequential or not (RepairParallelism). If sequential, * it will requests merkle tree creation from each replica in sequence (though in that case * we still first send a message to each node to flush and snapshot data so each merkle tree * creation is still done on similar data, even if the actual creation is not @@ -88,9 +88,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement private final String[] cfnames; public final RepairParallelism parallelismDegree; /** Range to repair */ - public final Range<Token> range; + public final Collection<Range<Token>> ranges; public final Set<InetAddress> endpoints; - private final long repairedAt; + public final long repairedAt; // number of validations left to be performed private final AtomicInteger validationRemaining; @@ -103,7 +103,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor - private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask")); + public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask")); private volatile boolean terminated = false; @@ -112,7 +112,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement * * @param parentRepairSession the parent sessions id * @param id this sessions id - * @param range range to repair + * @param ranges ranges to repair * @param keyspace name of keyspace * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees * @param endpoints the data centers that should be part of the repair; null for all DCs @@ -121,7 +121,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement */ public RepairSession(UUID parentRepairSession, UUID id, - Range<Token> range, + Collection<Range<Token>> ranges, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, @@ -135,7 +135,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.parallelismDegree = parallelismDegree; this.keyspace = keyspace; this.cfnames = cfnames; - this.range = range; + this.ranges = ranges; this.endpoints = endpoints; this.repairedAt = repairedAt; this.validationRemaining = new AtomicInteger(cfnames.length); @@ -146,9 +146,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement return id; } - public Range<Token> getRange() + public Collection<Range<Token>> getRanges() { - return range; + return ranges; } public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, ValidationTask task) @@ -166,9 +166,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement * * @param desc repair job description * @param endpoint endpoint that sent merkle tree - * @param tree calculated merkle tree, or null if validation failed + * @param trees calculated merkle trees, or null if validation failed */ - public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTree tree) + public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTrees trees) { ValidationTask task = validating.remove(Pair.create(desc, endpoint)); if (task == null) @@ -180,7 +180,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint); logger.info("[repair #{}] {}", getId(), message); Tracing.traceRepair(message); - task.treeReceived(tree); + task.treesReceived(trees); // Unregister from FailureDetector once we've completed synchronizing Merkle trees. // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down. @@ -234,15 +234,15 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement if (terminated) return; - logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames))); - Tracing.traceRepair("Syncing range {}", range); - SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, range, endpoints); + logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames))); + Tracing.traceRepair("Syncing range {}", ranges); + SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints); if (endpoints.isEmpty()) { - logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", range)); + logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", ranges)); Tracing.traceRepair(message); - set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList())); + set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList())); SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message)); return; } @@ -265,7 +265,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length); for (String cfname : cfnames) { - RepairJob job = new RepairJob(this, cfname, parallelismDegree, repairedAt, taskExecutor); + RepairJob job = new RepairJob(this, cfname); executor.execute(job); jobs.add(job); } @@ -277,8 +277,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement { // this repair session is completed logger.info("[repair #{}] {}", getId(), "Session completed successfully"); - Tracing.traceRepair("Completed sync of range {}", range); - set(new RepairSessionResult(id, keyspace, range, results)); + Tracing.traceRepair("Completed sync of range {}", ranges); + set(new RepairSessionResult(id, keyspace, ranges, results)); taskExecutor.shutdown(); // mark this session as terminated http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairSessionResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java index 4551608..d4fff37 100644 --- a/src/java/org/apache/cassandra/repair/RepairSessionResult.java +++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java @@ -30,14 +30,14 @@ public class RepairSessionResult { public final UUID sessionId; public final String keyspace; - public final Range<Token> range; + public final Collection<Range<Token>> ranges; public final Collection<RepairResult> repairJobResults; - public RepairSessionResult(UUID sessionId, String keyspace, Range<Token> range, Collection<RepairResult> repairJobResults) + public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults) { this.sessionId = sessionId; this.keyspace = keyspace; - this.range = range; + this.ranges = ranges; this.repairJobResults = repairJobResults; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/SyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java index 7350a66..8adec6f 100644 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.repair; -import java.util.ArrayList; import java.util.List; import com.google.common.util.concurrent.AbstractFuture; @@ -27,7 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; /** * SyncTask will calculate the difference of MerkleTree between two nodes @@ -56,8 +55,7 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna public void run() { // compare trees, and collect differences - List<Range<Token>> differences = new ArrayList<>(); - differences.addAll(MerkleTree.difference(r1.tree, r2.tree)); + List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees); stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index 70e74db..9cf6c3e 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@ -130,7 +130,7 @@ public final class SystemDistributedKeyspace processSilent(fmtQuery); } - public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Range<Token> range, Iterable<InetAddress> endpoints) + public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints) { String coordinator = FBUtilities.getBroadcastAddress().getHostAddress(); Set<String> participants = Sets.newHashSet(coordinator); @@ -144,17 +144,20 @@ public final class SystemDistributedKeyspace for (String cfname : cfnames) { - String fmtQry = String.format(query, NAME, REPAIR_HISTORY, - keyspaceName, - cfname, - id.toString(), - parent_id.toString(), - range.left.toString(), - range.right.toString(), - coordinator, - Joiner.on("', '").join(participants), - RepairState.STARTED.toString()); - processSilent(fmtQry); + for (Range<Token> range : ranges) + { + String fmtQry = String.format(query, NAME, REPAIR_HISTORY, + keyspaceName, + cfname, + id.toString(), + parent_id.toString(), + range.left.toString(), + range.right.toString(), + coordinator, + Joiner.on("', '").join(participants), + RepairState.STARTED.toString()); + processSilent(fmtQry); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/TreeResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java index eede4ee..c898b36 100644 --- a/src/java/org/apache/cassandra/repair/TreeResponse.java +++ b/src/java/org/apache/cassandra/repair/TreeResponse.java @@ -19,7 +19,7 @@ package org.apache.cassandra.repair; import java.net.InetAddress; -import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; /** * Merkle tree response sent from given endpoint. @@ -27,11 +27,11 @@ import org.apache.cassandra.utils.MerkleTree; public class TreeResponse { public final InetAddress endpoint; - public final MerkleTree tree; + public final MerkleTrees trees; - public TreeResponse(InetAddress endpoint, MerkleTree tree) + public TreeResponse(InetAddress endpoint, MerkleTrees trees) { this.endpoint = endpoint; - this.tree = tree; + this.trees = trees; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/ValidationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java index a52ec4f..bd866d2 100644 --- a/src/java/org/apache/cassandra/repair/ValidationTask.java +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@ -18,13 +18,17 @@ package org.apache.cassandra.repair; import java.net.InetAddress; +import java.util.Map; import com.google.common.util.concurrent.AbstractFuture; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationRequest; import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; /** * ValidationTask sends {@link ValidationRequest} to a replica. @@ -53,19 +57,19 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn } /** - * Receive MerkleTree from replica node. + * Receive MerkleTrees from replica node. * - * @param tree MerkleTree that is sent from replica. Null if validation failed on replica node. + * @param trees MerkleTrees that is sent from replica. Null if validation failed on replica node. */ - public void treeReceived(MerkleTree tree) + public void treesReceived(MerkleTrees trees) { - if (tree == null) + if (trees == null) { setException(new RepairException(desc, "Validation failed in " + endpoint)); } else { - set(new TreeResponse(endpoint, tree)); + set(new TreeResponse(endpoint, trees)); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 87d186c..7d6c787 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -33,12 +33,15 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTree.RowHash; +import org.apache.cassandra.utils.MerkleTrees; /** * Handles the building of a merkle tree for a column family. @@ -58,11 +61,11 @@ public class Validator implements Runnable // null when all rows with the min token have been consumed private long validated; - private MerkleTree tree; + private MerkleTrees trees; // current range being updated private MerkleTree.TreeRange range; // iterator for iterating sub ranges (MT's leaves) - private MerkleTree.TreeRangeIterator ranges; + private MerkleTrees.TreeRangeIterator ranges; // last key seen private DecoratedKey lastKey; @@ -76,9 +79,9 @@ public class Validator implements Runnable ranges = null; } - public void prepare(ColumnFamilyStore cfs, MerkleTree tree) + public void prepare(ColumnFamilyStore cfs, MerkleTrees tree) { - this.tree = tree; + this.trees = tree; if (!tree.partitioner().preservesOrder()) { @@ -87,32 +90,35 @@ public class Validator implements Runnable } else { - List<DecoratedKey> keys = new ArrayList<>(); - for (DecoratedKey sample : cfs.keySamples(desc.range)) + for (Range<Token> range : tree.ranges()) { - assert desc.range.contains(sample.getToken()): "Token " + sample.getToken() + " is not within range " + desc.range; - keys.add(sample); - } + List<DecoratedKey> keys = new ArrayList<>(); + for (DecoratedKey sample : cfs.keySamples(range)) + { + assert range.contains(sample.getToken()) : "Token " + sample.getToken() + " is not within range " + desc.ranges; + keys.add(sample); + } - if (keys.isEmpty()) - { - // use an even tree distribution - tree.init(); - } - else - { - int numkeys = keys.size(); - Random random = new Random(); - // sample the column family using random keys from the index - while (true) + if (keys.isEmpty()) + { + // use an even tree distribution + tree.init(range); + } + else { - DecoratedKey dk = keys.get(random.nextInt(numkeys)); - if (!tree.split(dk.getToken())) - break; + int numKeys = keys.size(); + Random random = new Random(); + // sample the column family using random keys from the index + while (true) + { + DecoratedKey dk = keys.get(random.nextInt(numKeys)); + if (!tree.split(dk.getToken())) + break; + } } } } - logger.debug("Prepared AEService tree of size {} for {}", tree.size(), desc); + logger.debug("Prepared AEService trees of size {} for {}", trees.size(), desc); ranges = tree.invalids(); } @@ -124,7 +130,7 @@ public class Validator implements Runnable */ public void add(UnfilteredRowIterator partition) { - assert desc.range.contains(partition.partitionKey().getToken()) : partition.partitionKey().getToken() + " is not contained in " + desc.range; + assert Range.isInRanges(partition.partitionKey().getToken(), desc.ranges) : partition.partitionKey().getToken() + " is not contained in " + desc.ranges; assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0 : "partition " + partition.partitionKey() + " received out of order wrt " + lastKey; lastKey = partition.partitionKey(); @@ -133,13 +139,14 @@ public class Validator implements Runnable range = ranges.next(); // generate new ranges as long as case 1 is true - while (!range.contains(lastKey.getToken())) + if (!findCorrectRange(lastKey.getToken())) { // add the empty hash, and move to the next range - range.ensureHashInitialised(); - range = ranges.next(); + ranges = trees.invalids(); + findCorrectRange(lastKey.getToken()); } + assert range.contains(lastKey.getToken()) : "Token not in MerkleTree: " + lastKey.getToken(); // case 3 must be true: mix in the hashed row RowHash rowHash = rowHash(partition); if (rowHash != null) @@ -148,6 +155,16 @@ public class Validator implements Runnable } } + public boolean findCorrectRange(Token t) + { + while (!range.contains(t) && ranges.hasNext()) + { + range = ranges.next(); + } + + return range.contains(t); + } + static class CountingDigest extends MessageDigest { private long count; @@ -212,9 +229,9 @@ public class Validator implements Runnable { // log distribution of rows in tree logger.debug("Validated {} partitions for {}. Partitions per leaf are:", validated, desc.sessionId); - tree.histogramOfRowCountPerLeaf().log(logger); + trees.logRowCountPerLeaf(logger); logger.debug("Validated {} partitions for {}. Partition sizes are:", validated, desc.sessionId); - tree.histogramOfRowSizePerLeaf().log(logger); + trees.logRowSizePerLeaf(logger); } } @@ -223,8 +240,8 @@ public class Validator implements Runnable { assert ranges != null : "Validator was not prepared()"; - if (range != null) - range.ensureHashInitialised(); + ranges = trees.invalids(); + while (ranges.hasNext()) { range = ranges.next(); @@ -255,6 +272,6 @@ public class Validator implements Runnable logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s.%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily)); Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily); } - MessagingService.instance().sendOneWay(new ValidationComplete(desc, tree).createMessage(), initiator); + MessagingService.instance().sendOneWay(new ValidationComplete(desc, trees).createMessage(), initiator); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java index ef0c4ec..90be8e5 100644 --- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java @@ -23,7 +23,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.repair.RepairJobDesc; -import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; /** * ValidationComplete message is sent when validation compaction completed successfully. @@ -34,24 +34,25 @@ public class ValidationComplete extends RepairMessage { public static MessageSerializer serializer = new ValidationCompleteSerializer(); - /** true if validation success, false otherwise */ - public final boolean success; /** Merkle hash tree response. Null if validation failed. */ - public final MerkleTree tree; + public final MerkleTrees trees; public ValidationComplete(RepairJobDesc desc) { super(Type.VALIDATION_COMPLETE, desc); - this.success = false; - this.tree = null; + trees = null; } - public ValidationComplete(RepairJobDesc desc, MerkleTree tree) + public ValidationComplete(RepairJobDesc desc, MerkleTrees trees) { super(Type.VALIDATION_COMPLETE, desc); - assert tree != null; - this.success = true; - this.tree = tree; + assert trees != null; + this.trees = trees; + } + + public boolean success() + { + return trees != null; } private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete> @@ -59,31 +60,31 @@ public class ValidationComplete extends RepairMessage public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException { RepairJobDesc.serializer.serialize(message.desc, out, version); - out.writeBoolean(message.success); - if (message.success) - MerkleTree.serializer.serialize(message.tree, out, version); + out.writeBoolean(message.success()); + if (message.trees != null) + MerkleTrees.serializer.serialize(message.trees, out, version); } public ValidationComplete deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); - if (in.readBoolean()) - { - MerkleTree tree = MerkleTree.serializer.deserialize(in, version); - return new ValidationComplete(desc, tree); - } - else + boolean success = in.readBoolean(); + + if (success) { - return new ValidationComplete(desc); + MerkleTrees trees = MerkleTrees.serializer.deserialize(in, version); + return new ValidationComplete(desc, trees); } + + return new ValidationComplete(desc); } public long serializedSize(ValidationComplete message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); - size += TypeSizes.sizeof(message.success); - if (message.success) - size += MerkleTree.serializer.serializedSize(message.tree, version); + size += TypeSizes.sizeof(message.success()); + if (message.trees != null) + size += MerkleTrees.serializer.serializedSize(message.trees, version); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 213edeb..e75d13e 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -103,7 +103,7 @@ public class ActiveRepairService * @return Future for asynchronous call or null if there is no need to repair */ public RepairSession submitRepairSession(UUID parentRepairSession, - Range<Token> range, + Collection<Range<Token>> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, @@ -383,7 +383,7 @@ public class ActiveRepairService { case VALIDATION_COMPLETE: ValidationComplete validation = (ValidationComplete) message; - session.validationComplete(desc, endpoint, validation.tree); + session.validationComplete(desc, endpoint, validation.trees); break; case SYNC_COMPLETE: // one of replica is synced. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/utils/MerkleTrees.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MerkleTrees.java b/src/java/org/apache/cassandra/utils/MerkleTrees.java new file mode 100644 index 0000000..43c023e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.*; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.PeekingIterator; +import org.slf4j.Logger; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + + +/** + * Wrapper class for handling of multiple MerkleTrees at once. + * + * The MerkleTree's are divided in Ranges of non-overlapping tokens. + */ +public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>> +{ + public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer(); + + private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator()); + + private IPartitioner partitioner; + + /** + * Creates empty MerkleTrees object. + * + * @param partitioner The partitioner to use + */ + public MerkleTrees(IPartitioner partitioner) + { + this(partitioner, new ArrayList<>()); + } + + private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees) + { + this.partitioner = partitioner; + addTrees(merkleTrees); + } + + /** + * Get the ranges that these merkle trees covers. + * + * @return + */ + public Collection<Range<Token>> ranges() + { + return merkleTrees.keySet(); + } + + /** + * Get the partitioner in use. + * + * @return + */ + public IPartitioner partitioner() + { + return partitioner; + } + + /** + * Add merkle tree's with the defined maxsize and ranges. + * + * @param maxsize + * @param ranges + */ + public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges) + { + for (Range<Token> range : ranges) + { + addMerkleTree(maxsize, range); + } + } + + /** + * Add a MerkleTree with the defined size and range. + * + * @param maxsize + * @param range + * @return The created merkle tree. + */ + public MerkleTree addMerkleTree(int maxsize, Range<Token> range) + { + return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range); + } + + @VisibleForTesting + public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range) + { + MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize); + addTree(tree); + + return tree; + } + + /** + * Get the MerkleTree.Range responsible for the given token. + * + * @param t + * @return + */ + public MerkleTree.TreeRange get(Token t) + { + return getMerkleTree(t).get(t); + } + + /** + * Init all MerkleTree's with an even tree distribution. + */ + public void init() + { + for (Range<Token> range : merkleTrees.keySet()) + { + init(range); + } + } + + /** + * Init a selected MerkleTree with an even tree distribution. + * + * @param range + */ + public void init(Range<Token> range) + { + merkleTrees.get(range).init(); + } + + /** + * Split the MerkleTree responsible for the given token. + * + * @param t + * @return + */ + public boolean split(Token t) + { + return getMerkleTree(t).split(t); + } + + /** + * Invalidate the MerkleTree responsible for the given token. + * + * @param t + */ + public void invalidate(Token t) + { + getMerkleTree(t).invalidate(t); + } + + /** + * Get the MerkleTree responsible for the given token range. + * + * @param range + * @return + */ + public MerkleTree getMerkleTree(Range<Token> range) + { + return merkleTrees.get(range); + } + + public long size() + { + long size = 0; + + for (MerkleTree tree : merkleTrees.values()) + { + size += tree.size(); + } + + return size; + } + + @VisibleForTesting + public void maxsize(Range<Token> range, int maxsize) + { + getMerkleTree(range).maxsize(maxsize); + } + + /** + * Get the MerkleTree responsible for the given token. + * + * @param t + * @return The given MerkleTree or null if none exist. + */ + private MerkleTree getMerkleTree(Token t) + { + for (Range<Token> range : merkleTrees.keySet()) + { + if (range.contains(t)) + return merkleTrees.get(range); + } + + return null; + } + + private void addTrees(Collection<MerkleTree> trees) + { + for (MerkleTree tree : trees) + { + addTree(tree); + } + } + + private void addTree(MerkleTree tree) + { + assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range"; + + merkleTrees.put(tree.fullRange, tree); + } + + private boolean validateNonOverlapping(MerkleTree tree) + { + for (Range<Token> range : merkleTrees.keySet()) + { + if (tree.fullRange.intersects(range)) + return false; + } + + return true; + } + + /** + * Get an iterator for all the invalids generated by the MerkleTrees. + * + * @return + */ + public TreeRangeIterator invalids() + { + return new TreeRangeIterator(); + } + + /** + * Log the row count per leaf for all MerkleTrees. + * + * @param logger + */ + public void logRowCountPerLeaf(Logger logger) + { + for (MerkleTree tree : merkleTrees.values()) + { + tree.histogramOfRowCountPerLeaf().log(logger); + } + } + + /** + * Log the row size per leaf for all MerkleTrees. + * + * @param logger + */ + public void logRowSizePerLeaf(Logger logger) + { + for (MerkleTree tree : merkleTrees.values()) + { + tree.histogramOfRowSizePerLeaf().log(logger); + } + } + + @VisibleForTesting + public byte[] hash(Range<Token> range) + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + boolean hashed = false; + + try + { + for (Range<Token> rt : merkleTrees.keySet()) + { + if (rt.intersects(range)) + { + byte[] bytes = merkleTrees.get(rt).hash(range); + if (bytes != null) + { + baos.write(bytes); + hashed = true; + } + } + } + } + catch (IOException e) + { + throw new RuntimeException("Unable to append merkle tree hash to result"); + } + + return hashed ? baos.toByteArray() : null; + } + + /** + * Get an iterator of all ranges and their MerkleTrees. + */ + public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator() + { + return merkleTrees.entrySet().iterator(); + } + + public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements + Iterable<MerkleTree.TreeRange>, + PeekingIterator<MerkleTree.TreeRange> + { + private final Iterator<MerkleTree> it; + + private MerkleTree.TreeRangeIterator current = null; + + private TreeRangeIterator() + { + it = merkleTrees.values().iterator(); + } + + public MerkleTree.TreeRange computeNext() + { + if (current == null || !current.hasNext()) + return nextIterator(); + + return current.next(); + } + + private MerkleTree.TreeRange nextIterator() + { + if (it.hasNext()) + { + current = it.next().invalids(); + + return current.next(); + } + + return endOfData(); + } + + public Iterator<MerkleTree.TreeRange> iterator() + { + return this; + } + } + + /** + * Get the differences between the two sets of MerkleTrees. + * + * @param ltree + * @param rtree + * @return + */ + public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree) + { + List<Range<Token>> differences = new ArrayList<>(); + for (MerkleTree tree : ltree.merkleTrees.values()) + { + differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange))); + } + return differences; + } + + public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees> + { + public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException + { + out.writeInt(trees.merkleTrees.size()); + for (MerkleTree tree : trees.merkleTrees.values()) + { + MerkleTree.serializer.serialize(tree, out, version); + } + } + + public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException + { + IPartitioner partitioner = null; + int nTrees = in.readInt(); + Collection<MerkleTree> trees = new ArrayList<>(nTrees); + if (nTrees > 0) + { + for (int i = 0; i < nTrees; i++) + { + MerkleTree tree = MerkleTree.serializer.deserialize(in, version); + trees.add(tree); + + if (partitioner == null) + partitioner = tree.partitioner(); + else + assert tree.partitioner() == partitioner; + } + } + + return new MerkleTrees(partitioner, trees); + } + + public long serializedSize(MerkleTrees trees, int version) + { + assert trees != null; + + long size = TypeSizes.sizeof(trees.merkleTrees.size()); + for (MerkleTree tree : trees.merkleTrees.values()) + { + size += MerkleTree.serializer.serializedSize(tree, version); + } + return size; + } + + } + + private static class TokenRangeComparator implements Comparator<Range<Token>> + { + @Override + public int compare(Range<Token> rt1, Range<Token> rt2) + { + if (rt1.left.compareTo(rt2.left) == 0) + return 0; + + return rt1.compareTo(rt2); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/gms.EndpointState.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/3.0/gms.EndpointState.bin b/test/data/serialization/3.0/gms.EndpointState.bin new file mode 100644 index 0000000..a230ae1 Binary files /dev/null and b/test/data/serialization/3.0/gms.EndpointState.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/gms.Gossip.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/3.0/gms.Gossip.bin b/test/data/serialization/3.0/gms.Gossip.bin new file mode 100644 index 0000000..af5ac57 Binary files /dev/null and b/test/data/serialization/3.0/gms.Gossip.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.SyncComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/3.0/service.SyncComplete.bin b/test/data/serialization/3.0/service.SyncComplete.bin new file mode 100644 index 0000000..73ea4b4 Binary files /dev/null and b/test/data/serialization/3.0/service.SyncComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.SyncRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/3.0/service.SyncRequest.bin b/test/data/serialization/3.0/service.SyncRequest.bin new file mode 100644 index 0000000..7e09777 Binary files /dev/null and b/test/data/serialization/3.0/service.SyncRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.ValidationComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/3.0/service.ValidationComplete.bin b/test/data/serialization/3.0/service.ValidationComplete.bin new file mode 100644 index 0000000..b8f0fb9 Binary files /dev/null and b/test/data/serialization/3.0/service.ValidationComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.ValidationRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/3.0/service.ValidationRequest.bin b/test/data/serialization/3.0/service.ValidationRequest.bin new file mode 100644 index 0000000..a00763b Binary files /dev/null and b/test/data/serialization/3.0/service.ValidationRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/AbstractSerializationsTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java index 4ee5ce4..501f4ae 100644 --- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java +++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java @@ -37,7 +37,7 @@ import java.util.Map; public class AbstractSerializationsTester { - protected static final String CUR_VER = System.getProperty("cassandra.version", "2.1"); + protected static final String CUR_VER = System.getProperty("cassandra.version", "3.0"); protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> () {{ put("0.7", 1); @@ -46,6 +46,7 @@ public class AbstractSerializationsTester put("2.0", MessagingService.VERSION_20); put("2.1", MessagingService.VERSION_21); put("2.2", MessagingService.VERSION_22); + put("3.0", MessagingService.VERSION_30); }}; protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index c3be08a..46c7068 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -173,7 +173,7 @@ public class LeveledCompactionStrategyTest int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); UUID parentRepSession = UUID.randomUUID(); ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false); - RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range); + RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 77a6ac4..db3f683 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -20,13 +20,14 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.junit.BeforeClass; import org.junit.Test; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -37,6 +38,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; import static org.junit.Assert.assertEquals; @@ -65,10 +67,11 @@ public class LocalSyncTaskTest extends SchemaLoader final InetAddress ep2 = InetAddress.getByName("127.0.0.1"); Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); - RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", range); + RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); + + MerkleTrees tree1 = createInitialTree(desc); - MerkleTree tree1 = createInitialTree(desc); - MerkleTree tree2 = createInitialTree(desc); + MerkleTrees tree2 = createInitialTree(desc); // difference the trees // note: we reuse the same endpoint which is bogus in theory but fine here @@ -90,10 +93,11 @@ public class LocalSyncTaskTest extends SchemaLoader ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false); - RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range); + RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); + + MerkleTrees tree1 = createInitialTree(desc); - MerkleTree tree1 = createInitialTree(desc); - MerkleTree tree2 = createInitialTree(desc); + MerkleTrees tree2 = createInitialTree(desc); // change a range in one of the trees Token token = partirioner.midpoint(range.left, range.right); @@ -115,9 +119,10 @@ public class LocalSyncTaskTest extends SchemaLoader assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences); } - private MerkleTree createInitialTree(RepairJobDesc desc) + private MerkleTrees createInitialTree(RepairJobDesc desc) { - MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)); + MerkleTrees tree = new MerkleTrees(partirioner); + tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges); tree.init(); for (MerkleTree.TreeRange r : tree.invalids()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 0af94b2..d40982c 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.repair; import java.io.IOException; import java.net.InetAddress; +import java.util.Arrays; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -53,7 +54,7 @@ public class RepairSessionTest IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); Set<InetAddress> endpoints = Sets.newHashSet(remote); - RepairSession session = new RepairSession(parentSessionId, sessionId, repairRange, "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1"); + RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1"); // perform convict session.convict(remote, Double.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index d77daf0..8fe76c3 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -19,6 +19,9 @@ package org.apache.cassandra.repair; import java.net.InetAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import org.junit.After; @@ -43,6 +46,7 @@ import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.concurrent.SimpleCondition; import static org.junit.Assert.assertEquals; @@ -77,7 +81,7 @@ public class ValidatorTest public void testValidatorComplete() throws Throwable { Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); + final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range)); final SimpleCondition lock = new SimpleCondition(); MessagingService.instance().addMessageSink(new IMessageSink() @@ -91,8 +95,8 @@ public class ValidatorTest RepairMessage m = (RepairMessage) message.payload; assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); assertEquals(desc, m.desc); - assertTrue(((ValidationComplete) m).success); - assertNotNull(((ValidationComplete) m).tree); + assertTrue(((ValidationComplete) m).success()); + assertNotNull(((ValidationComplete) m).trees); } } finally @@ -113,7 +117,8 @@ public class ValidatorTest ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); Validator validator = new Validator(desc, remote, 0); - MerkleTree tree = new MerkleTree(partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15)); + MerkleTrees tree = new MerkleTrees(partitioner); + tree.addMerkleTrees((int) Math.pow(2, 15), validator.desc.ranges); validator.prepare(cfs, tree); // and confirm that the tree was split @@ -137,7 +142,7 @@ public class ValidatorTest public void testValidatorFailed() throws Throwable { Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); + final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range)); final SimpleCondition lock = new SimpleCondition(); MessagingService.instance().addMessageSink(new IMessageSink() @@ -151,8 +156,8 @@ public class ValidatorTest RepairMessage m = (RepairMessage) message.payload; assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); assertEquals(desc, m.desc); - assertFalse(((ValidationComplete) m).success); - assertNull(((ValidationComplete) m).tree); + assertFalse(((ValidationComplete) m).success()); + assertNull(((ValidationComplete) m).trees); } } finally http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index 80bb452..847bcea 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.service; import java.io.IOException; import java.net.InetAddress; +import java.util.Arrays; import java.util.Collections; import java.util.UUID; @@ -43,7 +44,7 @@ import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; public class SerializationsTest extends AbstractSerializationsTester { @@ -58,7 +59,7 @@ public class SerializationsTest extends AbstractSerializationsTester partitionerSwitcher = Util.switchPartitioner(RandomPartitioner.instance); RANDOM_UUID = UUID.fromString("b5c3d033-75aa-4c2f-a819-947aac7a0c54"); FULL_RANGE = new Range<>(Util.testPartitioner().getMinimumToken(), Util.testPartitioner().getMinimumToken()); - DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE); + DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", Arrays.asList(FULL_RANGE)); } @AfterClass @@ -66,8 +67,7 @@ public class SerializationsTest extends AbstractSerializationsTester { partitionerSwitcher.close(); } - - + private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException { try (DataOutputStreamPlus out = getOutput(fileName)) @@ -109,13 +109,17 @@ public class SerializationsTest extends AbstractSerializationsTester private void testValidationCompleteWrite() throws IOException { IPartitioner p = RandomPartitioner.instance; + + MerkleTrees mt = new MerkleTrees(p); + // empty validation - MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15)); + mt.addMerkleTree((int) Math.pow(2, 15), FULL_RANGE); Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1); ValidationComplete c0 = new ValidationComplete(DESC, mt); // validation with a tree - mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE); + mt = new MerkleTrees(p); + mt.addMerkleTree(Integer.MAX_VALUE, FULL_RANGE); for (int i = 0; i < 10; i++) mt.split(p.getRandomToken()); Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1); @@ -140,24 +144,24 @@ public class SerializationsTest extends AbstractSerializationsTester assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE; assert DESC.equals(message.desc); - assert ((ValidationComplete) message).success; - assert ((ValidationComplete) message).tree != null; + assert ((ValidationComplete) message).success(); + assert ((ValidationComplete) message).trees != null; // validation with a tree message = RepairMessage.serializer.deserialize(in, getVersion()); assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE; assert DESC.equals(message.desc); - assert ((ValidationComplete) message).success; - assert ((ValidationComplete) message).tree != null; + assert ((ValidationComplete) message).success(); + assert ((ValidationComplete) message).trees != null; // failed validation message = RepairMessage.serializer.deserialize(in, getVersion()); assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE; assert DESC.equals(message.desc); - assert !((ValidationComplete) message).success; - assert ((ValidationComplete) message).tree == null; + assert !((ValidationComplete) message).success(); + assert ((ValidationComplete) message).trees == null; // MessageOuts for (int i = 0; i < 3; i++)
